Source code for driada.utils.signals

"""
Signal Processing Utilities
===========================

This module contains utility functions for signal generation, analysis,
and filtering. It consolidates functionality from the former signals module.

Functions
---------
brownian : Generate Brownian motion (Wiener process)
approximate_entropy : Calculate approximate entropy of a signal
filter_1d_timeseries : Filter a 1D time series using various methods
filter_signals : Filter multiple signals (2D array)
adaptive_filter_signals : Adaptively filter based on SNR
"""

import numpy as np
from scipy.stats import norm
from scipy.signal import savgol_filter
from scipy.ndimage import gaussian_filter1d
import pywt
from typing import Optional, Union, List

try:
    from .data import check_positive, check_nonnegative
except ImportError:
    # For standalone module execution (e.g., doctests)
    from driada.utils.data import check_positive, check_nonnegative


[docs] def brownian( x0: Union[float, np.ndarray], n: int, dt: float = 1.0, delta: float = 1.0, out: Optional[np.ndarray] = None, ) -> np.ndarray: """ Generate an instance of Brownian motion (i.e. the Wiener process). The Brownian motion follows: X(t) = X(0) + N(0, delta**2 * t; 0, t) where N(a,b; t0, t1) is a normally distributed random variable with mean a and variance b. The parameters t0 and t1 make explicit the statistical independence of N on different time intervals. Written as an iteration scheme: X(t + dt) = X(t) + N(0, delta**2 * dt; t, t+dt) Parameters ---------- x0 : float or np.ndarray The initial condition(s) (i.e. position(s)) of the Brownian motion. If array, each value is treated as an initial condition. n : int The number of steps to take. dt : float, optional The time step. Default is 1.0. delta : float, optional Determines the "speed" of the Brownian motion. The random variable of the position at time t, X(t), has a normal distribution whose mean is the position at time t=0 and whose variance is delta**2*t. Default is 1.0. out : np.ndarray, optional If provided, specifies the array in which to put the result. If None, a new numpy array is created and returned. Returns ------- np.ndarray Array of floats with shape `x0.shape + (n,)`. Note that the initial value `x0` is not included in the returned array. Raises ------ ValueError If n <= 0, dt <= 0, or delta < 0 Examples -------- >>> # Generate single Brownian motion path >>> path = brownian(0.0, 1000, dt=0.01) >>> path.shape (1000,) >>> # Generate multiple paths with different initial conditions >>> paths = brownian([0.0, 1.0, -1.0], 1000, dt=0.01) >>> paths.shape (3, 1000)""" # Validate inputs if not isinstance(n, (int, np.integer)) or n <= 0: raise ValueError(f"n must be a positive integer, got {n}") check_positive(dt=dt) check_nonnegative(delta=delta) x0 = np.asarray(x0) # For each element of x0, generate a sample of n numbers from a # normal distribution. r = norm.rvs(size=x0.shape + (n,), scale=delta * np.sqrt(dt)) # If `out` was not given, create an output array. if out is None: out = np.empty(r.shape) # This computes the Brownian motion by forming the cumulative sum of # the random samples. np.cumsum(r, axis=-1, out=out) # Add the initial condition. out += np.expand_dims(x0, axis=-1) return out
[docs] def approximate_entropy(U: Union[List, np.ndarray], m: int, r: float) -> float: """ Calculate approximate entropy (ApEn) of a signal. Approximate entropy is a regularity statistic that quantifies the unpredictability of fluctuations in a time series. A time series containing many repetitive patterns has a relatively small ApEn; a less predictable process has a higher ApEn. Parameters ---------- U : array-like Input signal/time series. Must have length >= m + 2. m : int Pattern length. Common values are 1 or 2. r : float Tolerance threshold for pattern matching. Typically 0.1-0.25 times the standard deviation of the data. Returns ------- float The approximate entropy value. Higher values indicate more randomness/complexity. Raises ------ ValueError If length of U < m + 2, m < 1, or r < 0 Notes ----- The algorithm: 1. Fix m (pattern length) and r (tolerance) 2. Look at patterns of length m and m+1 3. Count pattern matches within tolerance r 4. Calculate the logarithmic frequency of patterns 5. Return the difference between m and m+1 pattern frequencies Complexity is O(N²). For long signals consider downsampling. References ---------- Pincus, S. M. (1991). Approximate entropy as a measure of system complexity. Proceedings of the National Academy of Sciences, 88(6), 2297-2301. Examples -------- >>> # Regular signal has low entropy >>> regular = [1, 2, 3, 1, 2, 3, 1, 2, 3] >>> apen = approximate_entropy(regular, m=2, r=0.5) >>> apen < 0.1 True >>> # Random signal has high entropy >>> import numpy as np >>> random_signal = np.random.randn(100) >>> apen = approximate_entropy(random_signal, m=2, r=0.2 * np.std(random_signal)) >>> apen > 0.5 # Typically true for random signals True""" # Validate inputs U = np.asarray(U) N = len(U) if N < m + 2: raise ValueError(f"Signal length ({N}) must be >= m + 2 ({m + 2})") if m < 1: raise ValueError(f"Pattern length m must be >= 1, got {m}") check_nonnegative(r=r) def _maxdist(x_i, x_j): """Calculate maximum distance between two patterns. Parameters ---------- x_i : list First pattern of values x_j : list Second pattern of values Returns ------- float Maximum absolute difference between corresponding elements """ return max([abs(ua - va) for ua, va in zip(x_i, x_j)]) def _phi(m): """Calculate phi(m) - the logarithmic frequency of patterns. Parameters ---------- m : int Pattern length Returns ------- float Average logarithm of matching pattern frequencies """ # Extract all patterns of length m patterns = [[U[j] for j in range(i, i + m)] for i in range(N - m + 1)] # Count matches for each pattern C = [ len([1 for x_j in patterns if _maxdist(x_i, x_j) <= r]) / (N - m + 1.0) for x_i in patterns ] # Avoid log(0) by adding small epsilon phi_sum = 0.0 for c in C: if c > 0: phi_sum += np.log(c) else: # When no patterns match, use a very small probability phi_sum += np.log(1e-10) return phi_sum / (N - m + 1.0) # Approximate entropy is the difference between phi(m+1) and phi(m) return abs(_phi(m + 1) - _phi(m))
[docs] def filter_1d_timeseries(data: np.ndarray, method: str = "gaussian", **kwargs) -> np.ndarray: """ Filter a 1D time series using various denoising methods. This is the core filtering function used by TimeSeries.filter() method. Supports Gaussian smoothing, Savitzky-Golay filtering, and wavelet denoising. Parameters ---------- data : ndarray 1D time series data method : str Filtering method: 'gaussian', 'savgol', 'wavelet', or 'none' **kwargs : dict Method-specific parameters: - gaussian: sigma (default: 1.0) - standard deviation for Gaussian kernel - savgol: window_length (default: 5), polyorder (default: 2) - wavelet: wavelet (default: 'db4'), level (default: auto), mode (default: 'smooth'), threshold_method (default: 'mad') Returns ------- ndarray Filtered 1D time series Raises ------ ValueError If unknown method or invalid parameters (e.g., polyorder >= window_length) Examples -------- >>> import numpy as np >>> np.random.seed(42) >>> # Create noisy signal >>> t = np.linspace(0, 1, 100) >>> data = np.sin(2 * np.pi * 5 * t) + 0.2 * np.random.randn(100) >>> # Gaussian smoothing for general noise reduction >>> filtered = filter_1d_timeseries(data, method='gaussian', sigma=1.5) >>> filtered.shape (100,) >>> # Savitzky-Golay for preserving peaks while smoothing >>> filtered = filter_1d_timeseries(data, method='savgol', window_length=5) >>> filtered.shape (100,) >>> # Wavelet denoising for multi-scale noise removal >>> filtered = filter_1d_timeseries(data, method='wavelet', wavelet='db4') >>> filtered.shape (100,)""" if method == "none": return data.copy() # Ensure we have a 1D array data = np.asarray(data).ravel() if method == "gaussian": # Gaussian filter: good for general smoothing sigma = kwargs.get("sigma", 1.0) return gaussian_filter1d(data, sigma=sigma) elif method == "savgol": # Savitzky-Golay: preserves features like peaks better than Gaussian window_length = kwargs.get("window_length", 5) polyorder = kwargs.get("polyorder", 2) # Ensure window_length is odd if window_length % 2 == 0: window_length += 1 # Check if signal is long enough if len(data) <= window_length: return data.copy() # Validate polyorder if polyorder >= window_length: raise ValueError( f"polyorder ({polyorder}) must be less than window_length ({window_length})" ) return savgol_filter(data, window_length, polyorder) elif method == "wavelet": # Wavelet denoising: excellent for multi-scale noise removal wavelet = kwargs.get("wavelet", "db4") # Daubechies 4 is a good default level = kwargs.get("level", None) mode = kwargs.get("mode", "smooth") # boundary handling threshold_method = kwargs.get("threshold_method", "mad") n = len(data) # Determine decomposition level if not specified max_level = pywt.dwt_max_level(n, wavelet) if level is None: # Automatic level selection: balance between noise removal and signal preservation level = min(max_level, max(1, int(np.log2(n)) - 4)) elif level > max_level: level = max_level # Perform wavelet decomposition coeffs = pywt.wavedec(data, wavelet, mode=mode, level=level) # Apply thresholding to detail coefficients (not the approximation) if threshold_method == "mad": # MAD-based threshold: robust to outliers for j in range(1, len(coeffs)): detail_coeffs = coeffs[j] # Estimate noise level using Median Absolute Deviation sigma = np.median(np.abs(detail_coeffs)) / 0.6745 # Universal threshold (Donoho & Johnstone) threshold = sigma * np.sqrt(2 * np.log(n)) # Soft thresholding: shrinks coefficients smoothly coeffs[j] = pywt.threshold(detail_coeffs, threshold, mode="soft") elif threshold_method == "sure": # SURE (Stein's Unbiased Risk Estimate): data-adaptive threshold for j in range(1, len(coeffs)): threshold = np.std(coeffs[j]) * np.sqrt(2 * np.log(len(coeffs[j]))) coeffs[j] = pywt.threshold(coeffs[j], threshold, mode="soft") # Reconstruct the signal reconstructed = pywt.waverec(coeffs, wavelet, mode=mode) # Handle potential length mismatch return reconstructed[:n] else: raise ValueError( f"Unknown filtering method: {method}. " f"Choose from: 'gaussian', 'savgol', 'wavelet', 'none'" )
[docs] def filter_signals(data: np.ndarray, method: str = "gaussian", **kwargs) -> np.ndarray: """ Filter multiple signals (2D array compatibility wrapper). Parameters ---------- data : ndarray Data with shape (n_signals, n_timepoints) or 1D array method : str Filtering method: 'gaussian', 'savgol', 'wavelet', or 'none' **kwargs : dict Method-specific parameters (see filter_1d_timeseries) Returns ------- ndarray Filtered data with same shape as input Raises ------ ValueError If unknown method or invalid parameters Examples -------- >>> # Filter multiple signals >>> signals = np.random.randn(10, 1000) # 10 signals, 1000 points each >>> filtered = filter_signals(signals, method='gaussian', sigma=2.0) >>> # Also works with 1D arrays >>> signal = np.random.randn(1000) >>> filtered = filter_signals(signal, method='savgol')""" if data.ndim == 1: return filter_1d_timeseries(data, method=method, **kwargs) filtered_data = np.zeros_like(data) for i in range(data.shape[0]): filtered_data[i] = filter_1d_timeseries(data[i], method=method, **kwargs) return filtered_data
[docs] def adaptive_filter_signals(data: np.ndarray, snr_threshold: float = 2.0) -> np.ndarray: """ Adaptively filter signals based on signal-to-noise ratio. Parameters ---------- data : ndarray Data with shape (n_signals, n_timepoints) snr_threshold : float SNR threshold for determining filter strength. Default 2.0. Returns ------- ndarray Adaptively filtered data with same shape as input Raises ------ ValueError If data is not 2D or snr_threshold <= 0 Notes ----- Uses simple binary threshold: strong filtering (sigma=2.0) for low SNR, light filtering (sigma=0.5) for high SNR. Examples -------- >>> # Adaptively filter based on estimated SNR >>> signals = np.random.randn(5, 1000) >>> filtered = adaptive_filter_signals(signals, snr_threshold=3.0) >>> # Lower threshold applies stronger filtering to more signals >>> filtered = adaptive_filter_signals(signals, snr_threshold=1.0)""" # Validate inputs if data.ndim != 2: raise ValueError(f"Data must be 2D, got shape {data.shape}") check_positive(snr_threshold=snr_threshold) filtered_data = np.zeros_like(data) for i in range(data.shape[0]): signal = data[i] # Estimate SNR (simple approach) signal_power = np.var(signal) noise_power = np.var(np.diff(signal)) # High-freq noise estimate snr = signal_power / (noise_power + 1e-10) # Adaptive filtering based on SNR if snr < snr_threshold: # High noise - stronger filtering sigma = 2.0 else: # Low noise - lighter filtering sigma = 0.5 filtered_data[i] = gaussian_filter1d(signal, sigma=sigma) return filtered_data