Delay Optimization
Delay optimization for INTENSE analysis.
Provides functions to find optimal temporal delays between pairs of time series (e.g. neural signals and behavioral variables) by maximizing a similarity metric across a range of shifts.
- driada.intense.delay.calculate_optimal_delays(ts_bunch1, ts_bunch2, metric, shift_window, ds, verbose=True, enable_progressbar=True, mi_estimator='gcmi', engine='auto', fft_cache=None, mi_estimator_kwargs=None)[source]
Calculate optimal temporal delays between pairs of time series.
Finds the delay that maximizes the similarity metric between each pair of time series from ts_bunch1 and ts_bunch2. This accounts for temporal offsets in neural responses relative to behavioral variables.
- Parameters:
ts_bunch1 (list of TimeSeries) – First set of time series (typically neural signals).
ts_bunch2 (list of TimeSeries) – Second set of time series (typically behavioral variables).
metric (str) – Similarity metric to maximize. See validate_metric for supported options.
shift_window (int) – Maximum shift to test in each direction (frames). Will test shifts from -shift_window to +shift_window inclusive.
ds (int) – Downsampling factor. Every ds-th point is used from the time series.
verbose (bool, default=True) – Whether to print progress information.
enable_progressbar (bool, default=True) – Whether to show progress bar.
mi_estimator (str, default='gcmi') – MI estimator to use when metric=’mi’. Options: ‘gcmi’ or ‘ksg’.
engine ({'auto', 'fft', 'loop'}, default='auto') – Computation engine for delay optimization: - ‘auto’: Use FFT when applicable (univariate continuous GCMI with >= 20 shifts) - ‘fft’: Force FFT (raises error if not applicable) - ‘loop’: Force per-shift loop (original behavior)
fft_cache (dict, optional) – Pre-computed FFT cache from _build_fft_cache. Keys are (key1, key2) tuples using stable identifiers from _get_ts_key(). If provided, avoids redundant data extraction.
mi_estimator_kwargs (dict, optional) – Additional keyword arguments passed to the MI estimator function.
- Returns:
optimal_delays – Optimal delay (in frames) for each pair. Positive values indicate that ts2 leads ts1, negative values indicate ts1 leads ts2.
- Return type:
np.ndarray of shape (len(ts_bunch1), len(ts_bunch2))
Notes
With FFT engine: O(n1 * n2 * n log n) where n is downsampled time series length
With loop engine: O(n1 * n2 * shifts * n) where shifts = 2 * shift_window / ds
FFT provides ~10-20x speedup for typical delay windows (100-200 shifts)
The optimal delay is found by exhaustive search over all possible shifts
Memory efficient: only stores final optimal delays, not all tested values
Examples
>>> # Create minimal time series with known phase shift >>> import numpy as np >>> from driada.information.info_base import TimeSeries >>> # 100 points is enough to demonstrate functionality >>> t = np.linspace(0, 2*np.pi, 100) >>> # Create two sine waves with 5-sample phase shift >>> data1 = np.sin(t) >>> data2 = np.sin(t + np.pi/4) # phase shifted signal >>> ts1 = TimeSeries(data1, discrete=False) >>> ts2 = TimeSeries(data2, discrete=False) >>> # Find optimal delay with small window for speed >>> delays = calculate_optimal_delays([ts1], [ts2], 'mi', ... shift_window=5, ds=1, verbose=False) >>> delays.shape (1, 1) >>> # The delay captures the phase relationship >>> -5 <= delays[0, 0] <= 5 True
- driada.intense.delay.calculate_optimal_delays_parallel(ts_bunch1, ts_bunch2, metric, shift_window, ds, verbose=True, n_jobs=-1, mi_estimator='gcmi', engine='auto', fft_cache=None, mi_estimator_kwargs=None)[source]
Calculate optimal temporal delays between pairs of time series using parallel processing.
Parallel version of calculate_optimal_delays that distributes computation across multiple CPU cores for improved performance with large datasets.
- Parameters:
ts_bunch1 (list of TimeSeries) – First set of time series (typically neural signals).
ts_bunch2 (list of TimeSeries) – Second set of time series (typically behavioral variables).
metric (str) – Similarity metric to maximize. See validate_metric for supported options.
shift_window (int) – Maximum shift to test in each direction (frames). Will test shifts from -shift_window to +shift_window inclusive.
ds (int) – Downsampling factor. Every ds-th point is used from the time series.
verbose (bool, default=True) – Whether to print progress information.
n_jobs (int, default=-1) – Number of parallel jobs to run. -1 uses all available cores.
mi_estimator (str, default='gcmi') – MI estimator to use when metric=’mi’. Options: ‘gcmi’ or ‘ksg’.
engine ({'auto', 'fft', 'loop'}, default='auto') – Computation engine for delay optimization: - ‘auto’: Use FFT when applicable (univariate continuous GCMI with >= 20 shifts) - ‘fft’: Force FFT (raises error if not applicable) - ‘loop’: Force per-shift loop (original behavior)
fft_cache (dict, optional) – Pre-computed FFT cache from _build_fft_cache. Keys are (key1, key2) tuples using stable identifiers from _get_ts_key(). Passed to each worker for cache reuse.
mi_estimator_kwargs (dict, optional) – Additional keyword arguments passed to the MI estimator function.
- Returns:
optimal_delays – Optimal delay (in frames) for each pair. Positive values indicate that ts2 leads ts1, negative values indicate ts1 leads ts2.
- Return type:
np.ndarray of shape (len(ts_bunch1), len(ts_bunch2))
Notes
Parallelization is done by splitting ts_bunch1 across workers
Each worker processes a subset of ts_bunch1 against all of ts_bunch2
Memory usage scales with number of workers
Speedup is typically sublinear due to overhead and memory bandwidth
FFT optimization within each worker provides additional speedup
See also
calculate_optimal_delaysSequential version of this function
Examples
>>> # Demonstrate parallel processing with minimal data >>> import numpy as np >>> from driada.information.info_base import TimeSeries >>> # Create 3 neurons and 2 behaviors with 50 timepoints each >>> np.random.seed(42) # For reproducible example >>> neurons = [TimeSeries(np.random.randn(50), discrete=False) for _ in range(3)] >>> behaviors = [TimeSeries(np.random.randn(50), discrete=False) for _ in range(2)] >>> # Use 2 cores with small shift window >>> delays = calculate_optimal_delays_parallel(neurons, behaviors, 'mi', ... shift_window=3, ds=1, n_jobs=2, verbose=False) >>> delays.shape (3, 2) >>> # All delays should be within the window >>> np.all(np.abs(delays) <= 3) True
Functions for finding optimal temporal delays between pairs of time series by maximizing a similarity metric across a range of shifts.
Functions
- driada.intense.delay.calculate_optimal_delays(ts_bunch1, ts_bunch2, metric, shift_window, ds, verbose=True, enable_progressbar=True, mi_estimator='gcmi', engine='auto', fft_cache=None, mi_estimator_kwargs=None)[source]
Calculate optimal temporal delays between pairs of time series.
Finds the delay that maximizes the similarity metric between each pair of time series from ts_bunch1 and ts_bunch2. This accounts for temporal offsets in neural responses relative to behavioral variables.
- Parameters:
ts_bunch1 (list of TimeSeries) – First set of time series (typically neural signals).
ts_bunch2 (list of TimeSeries) – Second set of time series (typically behavioral variables).
metric (str) – Similarity metric to maximize. See validate_metric for supported options.
shift_window (int) – Maximum shift to test in each direction (frames). Will test shifts from -shift_window to +shift_window inclusive.
ds (int) – Downsampling factor. Every ds-th point is used from the time series.
verbose (bool, default=True) – Whether to print progress information.
enable_progressbar (bool, default=True) – Whether to show progress bar.
mi_estimator (str, default='gcmi') – MI estimator to use when metric=’mi’. Options: ‘gcmi’ or ‘ksg’.
engine ({'auto', 'fft', 'loop'}, default='auto') – Computation engine for delay optimization: - ‘auto’: Use FFT when applicable (univariate continuous GCMI with >= 20 shifts) - ‘fft’: Force FFT (raises error if not applicable) - ‘loop’: Force per-shift loop (original behavior)
fft_cache (dict, optional) – Pre-computed FFT cache from _build_fft_cache. Keys are (key1, key2) tuples using stable identifiers from _get_ts_key(). If provided, avoids redundant data extraction.
mi_estimator_kwargs (dict, optional) – Additional keyword arguments passed to the MI estimator function.
- Returns:
optimal_delays – Optimal delay (in frames) for each pair. Positive values indicate that ts2 leads ts1, negative values indicate ts1 leads ts2.
- Return type:
np.ndarray of shape (len(ts_bunch1), len(ts_bunch2))
Notes
With FFT engine: O(n1 * n2 * n log n) where n is downsampled time series length
With loop engine: O(n1 * n2 * shifts * n) where shifts = 2 * shift_window / ds
FFT provides ~10-20x speedup for typical delay windows (100-200 shifts)
The optimal delay is found by exhaustive search over all possible shifts
Memory efficient: only stores final optimal delays, not all tested values
Examples
>>> # Create minimal time series with known phase shift >>> import numpy as np >>> from driada.information.info_base import TimeSeries >>> # 100 points is enough to demonstrate functionality >>> t = np.linspace(0, 2*np.pi, 100) >>> # Create two sine waves with 5-sample phase shift >>> data1 = np.sin(t) >>> data2 = np.sin(t + np.pi/4) # phase shifted signal >>> ts1 = TimeSeries(data1, discrete=False) >>> ts2 = TimeSeries(data2, discrete=False) >>> # Find optimal delay with small window for speed >>> delays = calculate_optimal_delays([ts1], [ts2], 'mi', ... shift_window=5, ds=1, verbose=False) >>> delays.shape (1, 1) >>> # The delay captures the phase relationship >>> -5 <= delays[0, 0] <= 5 True
- driada.intense.delay.calculate_optimal_delays_parallel(ts_bunch1, ts_bunch2, metric, shift_window, ds, verbose=True, n_jobs=-1, mi_estimator='gcmi', engine='auto', fft_cache=None, mi_estimator_kwargs=None)[source]
Calculate optimal temporal delays between pairs of time series using parallel processing.
Parallel version of calculate_optimal_delays that distributes computation across multiple CPU cores for improved performance with large datasets.
- Parameters:
ts_bunch1 (list of TimeSeries) – First set of time series (typically neural signals).
ts_bunch2 (list of TimeSeries) – Second set of time series (typically behavioral variables).
metric (str) – Similarity metric to maximize. See validate_metric for supported options.
shift_window (int) – Maximum shift to test in each direction (frames). Will test shifts from -shift_window to +shift_window inclusive.
ds (int) – Downsampling factor. Every ds-th point is used from the time series.
verbose (bool, default=True) – Whether to print progress information.
n_jobs (int, default=-1) – Number of parallel jobs to run. -1 uses all available cores.
mi_estimator (str, default='gcmi') – MI estimator to use when metric=’mi’. Options: ‘gcmi’ or ‘ksg’.
engine ({'auto', 'fft', 'loop'}, default='auto') – Computation engine for delay optimization: - ‘auto’: Use FFT when applicable (univariate continuous GCMI with >= 20 shifts) - ‘fft’: Force FFT (raises error if not applicable) - ‘loop’: Force per-shift loop (original behavior)
fft_cache (dict, optional) – Pre-computed FFT cache from _build_fft_cache. Keys are (key1, key2) tuples using stable identifiers from _get_ts_key(). Passed to each worker for cache reuse.
mi_estimator_kwargs (dict, optional) – Additional keyword arguments passed to the MI estimator function.
- Returns:
optimal_delays – Optimal delay (in frames) for each pair. Positive values indicate that ts2 leads ts1, negative values indicate ts1 leads ts2.
- Return type:
np.ndarray of shape (len(ts_bunch1), len(ts_bunch2))
Notes
Parallelization is done by splitting ts_bunch1 across workers
Each worker processes a subset of ts_bunch1 against all of ts_bunch2
Memory usage scales with number of workers
Speedup is typically sublinear due to overhead and memory bandwidth
FFT optimization within each worker provides additional speedup
See also
calculate_optimal_delaysSequential version of this function
Examples
>>> # Demonstrate parallel processing with minimal data >>> import numpy as np >>> from driada.information.info_base import TimeSeries >>> # Create 3 neurons and 2 behaviors with 50 timepoints each >>> np.random.seed(42) # For reproducible example >>> neurons = [TimeSeries(np.random.randn(50), discrete=False) for _ in range(3)] >>> behaviors = [TimeSeries(np.random.randn(50), discrete=False) for _ in range(2)] >>> # Use 2 cores with small shift window >>> delays = calculate_optimal_delays_parallel(neurons, behaviors, 'mi', ... shift_window=3, ds=1, n_jobs=2, verbose=False) >>> delays.shape (3, 2) >>> # All delays should be within the window >>> np.all(np.abs(delays) <= 3) True