"""Ordinal partition network from 1D time series."""
import math
import numpy as np
import scipy.sparse as sp
from ..network.net_base import Network
from .embedding import takens_embedding
_MAX_D = 7
def _encode_patterns(ranks, d):
"""Encode ordinal rank patterns as Lehmer codes.
Parameters
----------
ranks : ndarray of shape (d, N_embedded)
Argsort of each embedded vector (column-wise ranks).
d : int
Embedding dimension.
Returns
-------
pattern_ids : ndarray of shape (N_embedded,), dtype int32
Integer ID for each ordinal pattern. Range [0, d!).
"""
n = ranks.shape[1]
ids = np.zeros(n, dtype=np.int64)
for i in range(d):
# Lehmer code: count how many elements after position i
# have a smaller rank value
count = np.zeros(n, dtype=np.int64)
for j in range(i + 1, d):
count += (ranks[j] < ranks[i]).astype(np.int64)
ids = ids * (d - i) + count
return ids.astype(np.int32)
def _build_transition_matrix(pattern_ids, d):
"""Build directed weighted transition matrix between ordinal patterns.
Parameters
----------
pattern_ids : 1D int array of length N_embedded
Pattern IDs from _encode_patterns.
d : int
Embedding dimension.
Returns
-------
adj : sparse CSR matrix of shape (n_states, n_states)
Row-normalized transition probabilities. Rows for patterns whose
only outgoing transitions are self-loops will sum to 0 (not 1),
because self-loops are removed before normalization to stay
consistent with the Network base class.
"""
n_states = math.factorial(d)
n = len(pattern_ids)
if n < 2:
return sp.csr_matrix((n_states, n_states))
# Count transitions
src = pattern_ids[:-1]
dst = pattern_ids[1:]
data = np.ones(len(src), dtype=np.float64)
adj = sp.coo_matrix((data, (src, dst)), shape=(n_states, n_states))
adj = adj.tocsr()
# Remove self-loops before normalizing — Network base class strips
# diagonal entries, so we exclude them here to keep row sums = 1.
adj.setdiag(0)
adj.eliminate_zeros()
# Row-normalize to get transition probabilities
row_sums = np.array(adj.sum(axis=1)).ravel()
row_sums[row_sums == 0] = 1.0 # avoid division by zero
diag_inv = sp.diags(1.0 / row_sums)
adj = diag_inv @ adj
return adj
[docs]
class OrdinalPartitionNetwork(Network):
"""Ordinal partition network from 1D time series.
Embeds with Takens delay embedding, ranks elements to get
ordinal patterns. Nodes = unique patterns, directed weighted
edges = transition probabilities between consecutive patterns.
Parameters
----------
data : array-like, 1D
Raw time series values.
d : int
Embedding dimension (pattern length). Must be <= 7.
tau : int
Embedding delay.
Notes
-----
Tied values in the time series are ranked by position (stable argsort),
so equal values receive deterministic but position-dependent ordinal
patterns. This is standard practice (Bandt & Pompe, 2002).
"""
[docs]
def __init__(self, data, d, tau, create_nx_graph=False):
data = np.asarray(data, dtype=np.float64)
if data.ndim != 1:
raise ValueError(
f"Data must be 1D time series, got {data.ndim}D."
)
if len(data) < 2:
raise ValueError(
f"Time series must have at least 2 data points, got {len(data)}."
)
if not np.all(np.isfinite(data)):
raise ValueError("Data contains NaN or Inf values.")
if d > _MAX_D:
raise ValueError(
f"Embedding dimension d={d} too large (d! = {math.factorial(d)} "
f"nodes). Must be <= {_MAX_D}."
)
if d < 2:
raise ValueError(f"Embedding dimension d must be >= 2, got {d}.")
emb = takens_embedding(data, tau=tau, m=d)
ranks = np.argsort(emb, axis=0)
pattern_ids = _encode_patterns(ranks, d)
adj = _build_transition_matrix(pattern_ids, d)
Network.__init__(
self, adj=adj, preprocessing=None,
create_nx_graph=create_nx_graph, directed=True,
)
self._data = data
self.d = d
self.tau = tau
self._pattern_ids = pattern_ids
@property
def permutation_entropy(self):
"""Permutation entropy normalized to [0, 1].
Shannon entropy of pattern visit frequencies divided by log2(d!).
High (~1) = complex/random. Low (~0) = regular/periodic.
"""
n_states = math.factorial(self.d)
freqs = np.bincount(self._pattern_ids, minlength=n_states)
freqs = freqs[freqs > 0].astype(np.float64)
freqs = freqs / freqs.sum()
h = -np.sum(freqs * np.log2(freqs))
h_max = np.log2(n_states)
if h_max == 0:
return 0.0
return max(0.0, h / h_max)
@property
def missing_patterns(self):
"""Number of d! ordinal patterns never visited."""
n_states = math.factorial(self.d)
return n_states - len(np.unique(self._pattern_ids))
@property
def timeseries_data(self):
"""Original time series data."""
return self._data