Source code for driada.recurrence.embedding

"""Takens delay embedding and parameter estimation for recurrence analysis."""

import numpy as np


[docs] def takens_embedding(data, tau, m): """Construct time-delay embedding matrix. Given a 1D time series x of length N, constructs the matrix:: [[x[0], x[1], ..., x[N_e-1] ], [x[tau], x[tau+1], ..., x[tau+N_e-1] ], ... [x[(m-1)*tau], ... x[N-1] ]] where N_e = N - (m-1)*tau. Parameters ---------- data : array-like, 1D Time series of length N. tau : int Time delay in samples. Must be >= 1. m : int Embedding dimension. Must be >= 2. Returns ------- ndarray of shape (m, N_embedded) Delay-embedded matrix. Rows are embedding dimensions, columns are time points. Matches ProximityGraph (features, samples) convention. Raises ------ ValueError If data is not 1D, or series is too short for given tau and m. """ data = np.asarray(data, dtype=float) if data.ndim != 1: raise ValueError(f"Data must be 1D array, got {data.ndim}D with shape {data.shape}") n = len(data) n_embedded = n - (m - 1) * tau if n_embedded < 1: raise ValueError( f"Time series of length {n} is too short for tau={tau}, m={m}. " f"Need at least {(m - 1) * tau + 1} points." ) indices = np.arange(n_embedded)[np.newaxis, :] + (np.arange(m) * tau)[:, np.newaxis] return data[indices]
[docs] def estimate_tau(data, max_shift=100, method='first_minimum', estimator='gcmi', **estimator_kwargs): """Estimate optimal time delay for Takens embedding. Parameters ---------- data : array-like, 1D Time series. max_shift : int, optional Maximum lag to evaluate. Default: 100. method : {'first_minimum', 'exponential_fit'} - 'first_minimum': First local minimum of time-delayed mutual information. - 'exponential_fit': Fit exponential decay to TDMI, return -1/slope. estimator : {'gcmi', 'ksg'}, optional MI estimator passed to get_tdmi(). Default: 'gcmi'. **estimator_kwargs Additional kwargs passed to get_tdmi() (e.g., nn=5 for KSG). Returns ------- int Optimal time delay in samples. Always >= 1. Raises ------ ValueError If method is not recognized. """ from driada.information.info_base import get_tdmi valid_methods = ('first_minimum', 'exponential_fit') if method not in valid_methods: raise ValueError( f"Unknown method '{method}'. Choose from {valid_methods}." ) # get_tdmi max_shift is exclusive, so pass max_shift+1 to include max_shift tdmi = get_tdmi(data, min_shift=1, max_shift=max_shift + 1, estimator=estimator, **estimator_kwargs) tdmi = np.array(tdmi) if method == 'first_minimum': return _tau_first_minimum(tdmi) else: return _tau_exponential_fit(tdmi)
def _tau_first_minimum(tdmi): """Find first local minimum of TDMI curve. Parameters ---------- tdmi : ndarray Time-delayed MI values. tdmi[0] corresponds to shift=1. Returns ------- int Time delay at the first local minimum (>= 1). """ if len(tdmi) < 2: return 1 for i in range(1, len(tdmi) - 1): if tdmi[i] < tdmi[i - 1] and tdmi[i] <= tdmi[i + 1]: return i + 1 # +1 because tdmi[0] corresponds to shift=1 # No minimum found — return shift where TDMI drops to 1/e of initial threshold = tdmi[0] / np.e below = np.where(tdmi < threshold)[0] if len(below) > 0: return int(below[0]) + 1 return max(1, len(tdmi) // 3) def _tau_exponential_fit(tdmi): """Estimate tau from TDMI via exponential-decay analogy. For a pure exponential I(s) = I(0)*exp(-s/tau), the integral of the normalised curve from 0 to infinity equals tau. We approximate this by trapezoidal integration of the normalised TDMI over its initial monotonically-decreasing segment. If log-linear fitting is feasible (>= 3 positive values in the decreasing segment), uses ``-1/slope`` from a least-squares fit; otherwise falls back to the integral approach. Parameters ---------- tdmi : ndarray Time-delayed MI values. tdmi[0] corresponds to shift=1. Returns ------- int Time delay estimated from exponential fit (>= 1). """ tdmi = np.array(tdmi, dtype=float) if len(tdmi) == 0 or tdmi[0] <= 0: return 1 # Find the initial monotonically decreasing segment dec_end = len(tdmi) for i in range(1, len(tdmi)): if tdmi[i] >= tdmi[i - 1]: dec_end = i break # Normalise and integrate (area-under-curve = tau for true exponential) segment = tdmi[:dec_end] normalised = segment / tdmi[0] shifts = np.arange(1, dec_end + 1) # Prepend the implicit point (shift=0, normalised=1.0) all_shifts = np.concatenate([[0], shifts]) all_norm = np.concatenate([[1.0], normalised]) tau = int(np.round(np.trapezoid(all_norm, all_shifts))) return max(1, tau)
[docs] def estimate_embedding_dim(data, tau, max_dim=10, r_tol=10.0, a_tol=2.0, fnn_threshold=0.05, return_fractions=False): """Estimate embedding dimension via false nearest neighbors (FNN). For each candidate dimension *m* (from 2 to *max_dim*), embeds the time series, finds each point's nearest neighbor, and checks whether that neighbor is still close in dimension *m* + 1. "False" neighbors appear close only because the attractor is projected into too few dimensions. Parameters ---------- data : array-like, 1D Time series. tau : int Time delay for embedding. max_dim : int, optional Maximum dimension to test. Default: 10. r_tol : float, optional Distance-ratio threshold (Kennel *et al.* 1992). Default: 10.0. a_tol : float, optional Absolute threshold relative to attractor size. Default: 2.0. fnn_threshold : float, optional FNN fraction below which the dimension is accepted. Default: 0.05 (5 %). return_fractions : bool, optional If True, also return per-dimension FNN fractions for diagnostic plots. Default: False. Returns ------- int or (int, list of (int, float)) Estimated embedding dimension (minimum 2). When *return_fractions* is True, returns ``(dim, fractions)`` where *fractions* is ``[(m, fnn_fraction), ...]`` for each tested dimension. """ from scipy.spatial import cKDTree data = np.asarray(data, dtype=float).ravel() attractor_size = np.std(data) if attractor_size == 0: if return_fractions: return 2, [(m, 0.0) for m in range(2, max_dim + 1)] return 2 # Minimum meaningful distance: pairs closer than this are considered # true (deterministic) neighbors and excluded from the FNN ratio test. dist_tol = attractor_size * 1e-8 best_dim = max_dim fractions = [] for m in range(2, max_dim + 1): emb_m = takens_embedding(data, tau, m).T # (N_embedded, m) emb_m1 = takens_embedding(data, tau, m + 1).T # (N_embedded_m1, m+1) n_m1 = emb_m1.shape[0] emb_m_trimmed = emb_m[:n_m1] tree = cKDTree(emb_m_trimmed) dists, indices = tree.query(emb_m_trimmed, k=2) nn_dists_m = dists[:, 1] # distance to nearest neighbor nn_indices = indices[:, 1] # index of nearest neighbor # Euclidean distance in m+1 dimensions nn_dists_m1 = np.linalg.norm( emb_m1 - emb_m1[nn_indices], axis=1 ) # Points with near-zero NN distance are deterministic repeats # (e.g. periodic signals); exclude them from the ratio criterion. valid = nn_dists_m > dist_tol if not np.any(valid): fnn_frac = 0.0 fractions.append((m, fnn_frac)) if best_dim == max_dim: best_dim = m if not return_fractions: return best_dim continue ratio = np.zeros(n_m1) ratio[valid] = ( np.abs(nn_dists_m1[valid] - nn_dists_m[valid]) / nn_dists_m[valid] ) criterion1 = ratio > r_tol criterion2 = (nn_dists_m1 / attractor_size) > a_tol fnn_frac = np.sum(criterion1 | criterion2) / n_m1 fractions.append((m, fnn_frac)) if fnn_frac < fnn_threshold and best_dim == max_dim: best_dim = m if not return_fractions: return best_dim if return_fractions: return best_dim, fractions return best_dim