"""Visibility graph from 1D time series."""
import warnings
import numpy as np
import scipy.sparse as sp
from ..network.net_base import Network
def _symmetrize_edges(rows, cols, n):
"""Build symmetric sparse binary adjacency from directed edge list."""
ones = np.ones(len(rows), dtype=np.float64)
all_rows = np.concatenate([rows, cols])
all_cols = np.concatenate([cols, rows])
all_data = np.concatenate([ones, ones])
adj = sp.csr_matrix((all_data, (all_rows, all_cols)), shape=(n, n))
adj.setdiag(0)
adj.eliminate_zeros()
adj.data[:] = 1.0
return adj
def _build_hvg(data):
"""Build Horizontal Visibility Graph adjacency. O(N) via stack.
Two points i, j are connected if all intermediate values
x_k < min(x_i, x_j) for i < k < j (Luque et al. 2009).
Uses a monotone stack: process points left to right, maintaining
a stack of indices with non-increasing values. Each point connects
to the stack top (nearest left point >= it) and to each popped
element (which it dominates).
"""
n = len(data)
rows, cols = [], []
stack = []
for i in range(n):
while stack and data[stack[-1]] < data[i]:
j = stack.pop()
rows.append(j)
cols.append(i)
if stack:
rows.append(stack[-1])
cols.append(i)
stack.append(i)
rows = np.array(rows, dtype=np.int32)
cols = np.array(cols, dtype=np.int32)
return _symmetrize_edges(rows, cols, n)
def _build_nvg(data):
"""Build Natural Visibility Graph adjacency. O(N^2) to O(N^3).
Two points i, j are connected if for all k with i < k < j:
(x_k - x_i) / (k - i) < (x_j - x_i) / (j - i)
For N > ~2000, this is slow (pure Python loops). Use
method='horizontal' for long time series.
"""
n = len(data)
rows, cols = [], []
for i in range(n - 1):
for j in range(i + 1, n):
slope_ij = (data[j] - data[i]) / (j - i)
visible = True
for k in range(i + 1, j):
slope_ik = (data[k] - data[i]) / (k - i)
if slope_ik >= slope_ij:
visible = False
break
if visible:
rows.append(i)
cols.append(j)
rows = np.array(rows, dtype=np.int32)
cols = np.array(cols, dtype=np.int32)
return _symmetrize_edges(rows, cols, n)
[docs]
class VisibilityGraph(Network):
"""Visibility graph from 1D time series.
Two time points are connected if no intermediate value blocks
the line of sight between them (NVG), or if all intermediate
values are below both endpoints (HVG).
Parameters
----------
data : array-like, 1D
Raw time series values.
method : {'horizontal', 'natural'}, default='horizontal'
'horizontal' (HVG): O(N) via monotone stack. Default.
'natural' (NVG): O(N^2) pairwise line-of-sight. Slow for N > ~2000.
directed : bool, default=False
If True, edges point forward in time only.
"""
[docs]
def __init__(self, data, method='horizontal', directed=False, create_nx_graph=False):
data = np.asarray(data, dtype=np.float64)
if data.ndim != 1:
raise ValueError(
f"Data must be 1D time series, got {data.ndim}D."
)
if len(data) < 2:
raise ValueError(
f"Time series must have at least 2 data points, got {len(data)}."
)
if not np.all(np.isfinite(data)):
raise ValueError("Data contains NaN or Inf values.")
if method == 'natural' and len(data) > 2000:
warnings.warn(
f"NVG with {len(data)} points will be slow (O(N^2)). "
"Consider method='horizontal' for large time series.",
stacklevel=2,
)
if method == 'horizontal':
adj = _build_hvg(data)
elif method == 'natural':
adj = _build_nvg(data)
else:
raise ValueError(
f"Unknown method '{method}'. Choose 'horizontal' or 'natural'."
)
if directed:
adj = sp.triu(adj, k=1, format='csr')
Network.__init__(
self, adj=adj, preprocessing=None,
create_nx_graph=create_nx_graph, directed=directed,
)
self._data = data
self._method = method
@property
def timeseries_data(self):
"""Original time series data."""
return self._data
@property
def vg_method(self):
"""Construction method ('horizontal' or 'natural')."""
return self._method