import hashlib
import scipy.sparse as ssp
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import hilbert
import numpy as np
import scipy.stats as st
[docs]
def populate_nested_dict(content, outer, inner):
"""Create a nested dictionary with specified structure and content.
Creates a two-level nested dictionary where each outer key maps to
a dictionary of inner keys, and each inner key maps to a copy of
the provided content.
Parameters
----------
content : dict or any copyable object
The content to populate at each leaf of the nested dictionary.
Must have a .copy() method. Will be copied for each inner key to avoid aliasing.
outer : list or iterable
Keys for the outer level of the nested dictionary.
inner : list or iterable
Keys for the inner level of the nested dictionary.
Returns
-------
dict
Nested dictionary with structure {outer_key: {inner_key: content_copy}}.
Raises
------
AttributeError
If content does not have a .copy() method.
Notes
-----
- Duplicate keys in outer or inner iterables will overwrite previous values.
- The content must have a .copy() method (e.g., dict, list, numpy array).
- Primitive types (int, str, float) will raise AttributeError.
Examples
--------
>>> content = {'value': 0, 'count': 0}
>>> outer = ['A', 'B']
>>> inner = ['x', 'y', 'z']
>>> nested = populate_nested_dict(content, outer, inner)
>>> nested['A']['x']
{'value': 0, 'count': 0}
>>> # Each entry is a separate copy
>>> nested['A']['x']['value'] = 5
>>> nested['B']['x']['value']
0"""
if not hasattr(content, "copy"):
raise AttributeError(
f"Content of type {type(content).__name__} does not have a .copy() method"
)
nested_dict = {o: {} for o in outer}
for o in outer:
nested_dict[o] = {i: content.copy() for i in inner}
return nested_dict
[docs]
def nested_dict_to_seq_of_tables(datadict, ordered_names1=None, ordered_names2=None):
"""Convert a nested dictionary to a sequence of 2D tables.
Transforms a nested dictionary with structure {outer: {inner: {key: value}}}
into a dictionary of 2D numpy arrays where rows correspond to outer keys
and columns to inner keys.
Parameters
----------
datadict : dict
Nested dictionary with three levels. Structure should be:
{outer_key: {inner_key: {data_key: value}}}
ordered_names1 : list or None, optional
Ordered list of outer keys to use as row indices.
If None, uses sorted outer keys. Default is None.
ordered_names2 : list or None, optional
Ordered list of inner keys to use as column indices.
If None, uses sorted inner keys. Default is None.
Returns
-------
dict
Dictionary mapping data keys to 2D numpy arrays where:
- Rows correspond to ordered_names1 (outer keys)
- Columns correspond to ordered_names2 (inner keys)
- Values are from the nested dictionary
Raises
------
ValueError
If datadict is empty.
IndexError
If datadict structure is inconsistent.
Notes
-----
- Missing values are filled with np.nan, not zeros.
- Assumes all inner dictionaries have the same data keys.
- Uses the first entry to determine the data key structure.
Examples
--------
>>> data = {
... 'A': {'x': {'metric1': 1, 'metric2': 2},
... 'y': {'metric1': 3, 'metric2': 4}},
... 'B': {'x': {'metric1': 5, 'metric2': 6},
... 'y': {'metric1': 7, 'metric2': 8}}
... }
>>> tables = nested_dict_to_seq_of_tables(data)
>>> tables['metric1']
array([[1., 3.],
[5., 7.]])
>>> # Rows are ['A', 'B'], columns are ['x', 'y']"""
# Validate input
if not datadict:
raise ValueError("Cannot process empty dictionary")
names1 = list(datadict.keys())
if not names1:
raise ValueError("Cannot process empty dictionary")
names2 = list(datadict[names1[0]].keys())
if not names2:
raise ValueError("First level dictionary is empty")
datakeys = list(datadict[names1[0]][names2[0]].keys())
if ordered_names1 is None:
ordered_names1 = sorted(names1)
if ordered_names2 is None:
ordered_names2 = sorted(names2)
table_seq = {dkey: np.zeros((len(names1), len(names2))) for dkey in datakeys}
for dkey in datakeys:
for i, n1 in enumerate(ordered_names1):
for j, n2 in enumerate(ordered_names2):
try:
table_seq[dkey][i, j] = datadict[n1][n2][dkey]
except KeyError:
table_seq[dkey][i, j] = np.nan
return table_seq
[docs]
def add_names_to_nested_dict(datadict, names1, names2):
"""Replace numeric keys in a nested dictionary with meaningful names.
Takes a nested dictionary with integer keys at two levels and replaces
them with provided names. Useful for converting indexed data structures
to named structures for better readability and access.
Parameters
----------
datadict : dict
Nested dictionary with integer keys at two levels. Expected structure
is datadict[i][j] where i and j are integers starting from 0.
names1 : list-like or None
Names to use for the first level keys. If None, uses range(n1) where
n1 is the number of first-level keys. Length must match the number
of first-level keys in datadict.
names2 : list-like or None
Names to use for the second level keys. If None, uses range(n2) where
n2 is the number of second-level keys. Length must match the number
of second-level keys in datadict.
Returns
-------
dict
New nested dictionary with the same data but keys replaced by names.
If both names1 and names2 are None, returns the original dict unchanged.
Structure: renamed_dict[name1][name2] contains datadict[i][j].
Raises
------
KeyError
If integer keys are not consecutive starting from 0.
ValueError
If names length doesn't match number of keys.
Examples
--------
>>> data = {0: {0: {'value': 1}, 1: {'value': 2}},
... 1: {0: {'value': 3}, 1: {'value': 4}}}
>>> names1 = ['row1', 'row2']
>>> names2 = ['col1', 'col2']
>>> renamed = add_names_to_nested_dict(data, names1, names2)
>>> renamed['row1']['col1']
{'value': 1}
Notes
-----
- Requires consecutive integer keys starting from 0.
- Uses .update() to merge inner dictionaries.
- Returns original dict if both names are None.
- Assumes all outer keys contain the same inner keys (e.g., if datadict[0] has keys [0,1,2],
then datadict[1], datadict[2], etc. must also have keys [0,1,2])."""
if names1 is None and names2 is None:
return datadict
# renaming for convenience
n1 = len(datadict.keys())
n2 = len(datadict[list(datadict.keys())[0]])
# Validate consecutive integer keys
outer_keys = sorted(datadict.keys())
if outer_keys != list(range(len(outer_keys))):
raise KeyError(f"Outer keys must be consecutive integers from 0. Found: {outer_keys}")
if names1 is None:
names1 = range(n1)
elif len(names1) != n1:
raise ValueError(f"names1 length ({len(names1)}) must match number of outer keys ({n1})")
if names2 is None:
names2 = range(n2)
elif len(names2) != n2:
raise ValueError(f"names2 length ({len(names2)}) must match number of inner keys ({n2})")
renamed_dict = populate_nested_dict(dict(), names1, names2)
for i in range(n1):
for j in range(n2):
renamed_dict[names1[i]][names2[j]].update(datadict[i][j])
return renamed_dict
[docs]
def retrieve_relevant_from_nested_dict(
nested_dict, target_key, target_value, operation="=", allow_missing_keys=False
):
"""Find all (outer_key, inner_key) pairs where a condition is met.
Searches through a nested dictionary structure to find all locations where
a specific key-value condition is satisfied. Useful for filtering nested
data structures based on criteria.
Parameters
----------
nested_dict : dict
Nested dictionary with structure {outer_key: {inner_key: data_dict}},
where data_dict contains the key-value pairs to search.
target_key : str
The key to look for in the innermost dictionaries.
target_value : any
The value to compare against. Must be comparable with stored values
when using ">" or "<" operations.
operation : {"=", ">", "<"}, default="="
Comparison operation to use:
* "=" : Find entries where target_key equals target_value
* ">" : Find entries where target_key is greater than target_value
* "<" : Find entries where target_key is less than target_value
allow_missing_keys : bool, default=False
If True, skip entries where target_key is missing instead of raising
an error. Missing keys are treated as not matching any criteria.
Returns
-------
list of tuples
List of (outer_key, inner_key) tuples identifying locations where
the condition is satisfied.
Raises
------
ValueError
If target_key is not found and allow_missing_keys is False, or if
an invalid operation is specified.
TypeError
If comparison operations ">" or "<" are used with incomparable types
(will propagate from Python's comparison).
Examples
--------
>>> data = {
... 'exp1': {'cell1': {'score': 0.8, 'type': 'A'},
... 'cell2': {'score': 0.6, 'type': 'B'}},
... 'exp2': {'cell1': {'score': 0.9, 'type': 'A'},
... 'cell2': {'score': 0.7, 'type': 'B'}}
... }
>>> retrieve_relevant_from_nested_dict(data, 'score', 0.7, '>')
[('exp1', 'cell1'), ('exp2', 'cell1')]
>>> retrieve_relevant_from_nested_dict(data, 'type', 'A', '=')
[('exp1', 'cell1'), ('exp2', 'cell1')]
Notes
-----
For ">" and "<" operations:
- Missing keys (when allow_missing_keys=True) are treated as not matching
- None values are treated as not matching (since None comparisons would fail)
- Incomparable types (e.g., string vs number) will raise TypeError"""
if operation not in ["=", ">", "<"]:
raise ValueError(f"Operation must be one of '=', '>', '<'. Got: {operation}")
relevant_pairs = []
for key1 in nested_dict.keys():
for key2 in nested_dict[key1].keys():
data = nested_dict[key1][key2]
if target_key not in data and not allow_missing_keys:
raise ValueError(f"Target key {target_key} not found in data dict")
if operation == "=":
criterion = data.get(target_key) == target_value
elif operation == ">":
criterion = (
data.get(target_key) > target_value
if data.get(target_key) is not None
else False
)
elif operation == "<":
criterion = (
data.get(target_key) < target_value
if data.get(target_key) is not None
else False
)
else:
raise ValueError(
f'Operation should be one of "=", ">", "<", but {operation} was passed'
)
if criterion:
relevant_pairs.append((key1, key2))
return relevant_pairs
[docs]
def rescale(data):
"""Rescale 1D data to the range [0, 1] using min-max normalization.
Applies min-max scaling to transform data linearly so that the minimum
value becomes 0 and the maximum value becomes 1. Useful for normalizing
time series or feature vectors to a common scale.
Parameters
----------
data : array-like
Input data to rescale. Must be 1-dimensional.
Returns
-------
ndarray
Rescaled data with same length as input, values in [0, 1].
If input min equals max, returns array of 0.5 values.
Raises
------
ValueError
If input data has more than 1 dimension.
Notes
-----
- Uses sklearn's MinMaxScaler internally. The transformation is:
X_scaled = (X - X.min()) / (X.max() - X.min())
- Constant arrays (where all values are equal) return 0.5.
- NaN values are preserved in the output.
- Single element arrays return 0.0.
Examples
--------
>>> data = np.array([1, 2, 3, 4, 5])
>>> rescale(data)
array([0. , 0.25, 0.5 , 0.75, 1. ])
>>> # Attempting to rescale 2D data raises an error
>>> data2d = np.array([[1, 5], [2, 4]])
>>> try:
... rescale(data2d)
... except ValueError as e:
... print(f"Error: {e}")
Error: Input data must be 1-dimensional, got shape (2, 2)"""
data = np.asarray(data)
if data.ndim > 1:
raise ValueError(f"Input data must be 1-dimensional, got shape {data.shape}")
scaler = MinMaxScaler(feature_range=(0, 1))
res = scaler.fit_transform(data.reshape(-1, 1)).ravel()
return res
[docs]
def get_hash(data):
"""Create a hash of numpy array or other data.
Parameters
----------
data : np.ndarray or other
Data to hash. For arrays, uses the raw bytes.
For other types, converts to string first.
Returns
-------
str
Hexadecimal hash string
Notes
-----
- For numpy arrays: includes shape and dtype in hash, so same data with
different shape produces different hashes.
- For non-arrays: uses str() representation which may vary across Python
versions and implementations.
- Byte order affects the hash for arrays.
- Uses SHA256 algorithm with UTF-8 encoding.
Examples
--------
>>> arr = np.array([1, 2, 3])
>>> hash1 = get_hash(arr)
>>> arr_reshaped = arr.reshape(3, 1)
>>> hash2 = get_hash(arr_reshaped)
>>> hash1 == hash2 # Different shapes produce different hashes
False"""
if isinstance(data, np.ndarray):
# For numpy arrays, use the raw bytes for consistent hashing
# Include shape and dtype in the hash to distinguish reshaped arrays
hash_id = hashlib.sha256()
hash_id.update(data.shape.__repr__().encode("utf-8"))
hash_id.update(data.dtype.str.encode("utf-8"))
hash_id.update(data.tobytes())
return hash_id.hexdigest()
else:
# For other data types, convert to string
# This is less ideal but provides a fallback
hash_id = hashlib.sha256()
hash_id.update(str(data).encode("utf-8"))
return hash_id.hexdigest()
[docs]
def phase_synchrony(vec1, vec2):
"""Calculate instantaneous phase synchrony between two signals.
Computes phase synchrony using the Hilbert transform to extract
instantaneous phases, then measures phase coupling using a
sine-based metric that ranges from 0 (no synchrony) to 1 (perfect synchrony).
Parameters
----------
vec1 : array-like
First signal, must be 1D array of same length as vec2.
vec2 : array-like
Second signal, must be 1D array of same length as vec1.
Returns
-------
ndarray
Phase synchrony values at each time point, ranging from 0 to 1.
Same length as input signals.
Raises
------
ValueError
If input signals have different lengths.
Notes
-----
The algorithm:
1. Applies Hilbert transform to get analytic signals
2. Extracts instantaneous phases using np.angle()
3. Computes phase difference at each time point
4. Maps to [0,1] using: ``1 - sin(abs(Δφ)/2)``
This metric is 1 when phases are aligned (Δφ = 0) and 0 when
maximally misaligned (Δφ = π).
- Edge effects from Hilbert transform may affect boundary values.
- Designed for real-valued signals.
- NaN values propagate through the calculation.
Examples
--------
>>> t = np.linspace(0, 1, 1000)
>>> signal1 = np.sin(2 * np.pi * 10 * t) # 10 Hz
>>> signal2 = np.sin(2 * np.pi * 10 * t + np.pi/4) # 10 Hz, phase shifted
>>> sync = phase_synchrony(signal1, signal2)
>>> round(np.mean(sync), 2) # High synchrony despite phase shift
0.62
See Also
--------
scipy.signal.hilbert :
Hilbert transform used to extract phases.
"""
vec1 = np.asarray(vec1)
vec2 = np.asarray(vec2)
if vec1.shape != vec2.shape:
raise ValueError(f"Input signals must have same shape. Got {vec1.shape} and {vec2.shape}")
al1 = np.angle(hilbert(vec1), deg=False)
al2 = np.angle(hilbert(vec2), deg=False)
phase_sync = 1 - np.sin(np.abs(al1 - al2) / 2)
return phase_sync
[docs]
def correlation_matrix(A):
"""
Compute Pearson correlation matrix between variables (rows).
Parameters
----------
A : numpy array of shape (n_variables, n_observations)
Data matrix where each row is a variable
Returns
-------
numpy array of shape (n_variables, n_variables)
Correlation matrix
Notes
-----
- Variables with zero variance (constant values) are handled by setting their
correlation to 1.0 with themselves and NaN with other variables.
- Single observation (n=1) returns NaN matrix as correlation is undefined.
- Uses row-wise computation (variables are rows).
Examples
--------
>>> A = np.array([[1, 2, 3], [4, 5, 6]])
>>> corr = correlation_matrix(A)
>>> corr.shape
(2, 2)"""
# Center the data
am = A - np.mean(A, axis=1, keepdims=True)
# Compute correlation matrix
n = A.shape[1]
if n > 1:
# Compute covariance matrix
cov = (am @ am.T) / (n - 1)
# Compute standard deviations
var_diag = np.diag(cov)
stds = np.sqrt(var_diag)
# Normalize to get correlation
# Handle zero variance case
with np.errstate(divide="ignore", invalid="ignore"):
corr = cov / np.outer(stds, stds)
# Set diagonal to 1 for zero-variance variables
np.fill_diagonal(corr, 1.0)
return corr
else:
# Single observation - correlation is undefined
return np.full((A.shape[0], A.shape[0]), np.nan)
[docs]
def cross_correlation_matrix(A, B):
"""Compute cross-correlation matrix between two sets of variables.
Computes Pearson correlations between variables (rows) in A and variables (rows) in B.
Parameters
----------
A : numpy array of shape (n_variables1, n_observations)
First data matrix where each row is a variable
B : numpy array of shape (n_variables2, n_observations)
Second data matrix where each row is a variable
Returns
-------
numpy array of shape (n_variables1, n_variables2)
Cross-correlation matrix where element [i,j] is the correlation
between A[i,:] and B[j,:]
Raises
------
ValueError
If A and B have different numbers of observations (columns).
Notes
-----
- Uses row-wise computation (variables are rows), consistent with correlation_matrix.
- Variables with zero variance will result in NaN correlations.
- Centers data by row means.
Examples
--------
>>> A = np.array([[1, 2, 3], [4, 5, 6]]) # 2 variables, 3 observations
>>> B = np.array([[7, 8, 9], [10, 11, 12]]) # 2 variables, 3 observations
>>> cross_corr = cross_correlation_matrix(A, B)
>>> cross_corr.shape
(2, 2)"""
if A.shape[1] != B.shape[1]:
raise ValueError(
f"A and B must have same number of observations. Got {A.shape[1]} and {B.shape[1]}"
)
# Center the data by row means (like correlation_matrix)
am = A - np.mean(A, axis=1, keepdims=True)
bm = B - np.mean(B, axis=1, keepdims=True)
# Compute cross-correlation
n = A.shape[1]
if n > 1:
# Handle zero variance case
with np.errstate(divide="ignore", invalid="ignore"):
# Compute standard deviations
std_a = np.sqrt(np.sum(am**2, axis=1, keepdims=True) / (n - 1))
std_b = np.sqrt(np.sum(bm**2, axis=1, keepdims=True) / (n - 1))
# Compute correlation
corr = (am @ bm.T) / (n - 1) / (std_a @ std_b.T)
return corr
else:
# Single observation - correlation is undefined
return np.full((A.shape[0], B.shape[0]), np.nan)
[docs]
def norm_cross_corr(a, b, mode="full"):
"""Compute normalized cross-correlation between two signals.
This function computes the normalized cross-correlation between two signals.
Each overlapping window is normalized independently to have zero mean and
unit variance, making the correlation insensitive to amplitude scaling and
DC offset.
Parameters
----------
a : array_like
First signal
b : array_like
Second signal
mode : {'full', 'valid', 'same'}, optional
Mode parameter controlling output size. Default is 'full'.
- 'full': returns correlation at all lags (length: len(a) + len(b) - 1)
- 'valid': returns only correlations without zero-padding (length: max(len(a) - len(b) + 1, 1))
- 'same': returns correlation of same length as first input (length: len(a))
Returns
-------
np.ndarray
Normalized cross-correlation values. Values are in the range [-1, 1].
The lag index can be computed as: lag = index - (len(a) - 1) for 'full' mode.
Raises
------
ValueError
If mode is not one of 'full', 'valid', or 'same'.
If either input signal is empty.
Notes
-----
The normalization is performed on each overlapping window separately, ensuring
that correlation values are in the range [-1, 1], where:
- 1 indicates perfect positive correlation
- -1 indicates perfect negative correlation (anti-correlation)
- 0 indicates no correlation
For constant signals (zero variance), the function returns zeros.
Examples
--------
>>> # Detect time shift between signals
>>> signal = np.sin(np.linspace(0, 4*np.pi, 100))
>>> shifted = np.roll(signal, 10) # Shift by 10 samples
>>> corr = norm_cross_corr(signal, shifted, mode='full')
>>> lag = np.argmax(corr) - (len(signal) - 1) # Should be close to -10"""
if mode not in ["full", "valid", "same"]:
raise ValueError(f"mode must be one of 'full', 'valid', or 'same'. Got: {mode}")
# Convert to numpy arrays
a = np.asarray(a, dtype=np.float64)
b = np.asarray(b, dtype=np.float64)
# Check for empty signals
if len(a) == 0 or len(b) == 0:
raise ValueError("Input signals cannot be empty")
# Handle constant signals (zero variance)
a_std = np.std(a)
b_std = np.std(b)
if a_std < 1e-10 or b_std < 1e-10:
# Return zeros for constant signals
if mode == "full":
return np.zeros(len(a) + len(b) - 1)
elif mode == "valid":
return np.zeros(max(len(a) - len(b) + 1, 1))
else: # 'same'
return np.zeros(max(len(a), len(b)))
# For normalized cross-correlation, we need to normalize each window
len_a = len(a)
len_b = len(b)
if mode == "full":
# For full mode, slide one signal across the other
n = len_a + len_b - 1
result = np.zeros(n)
# Pad signal a for easier indexing
a_padded = np.concatenate([np.zeros(len_b - 1), a, np.zeros(len_b - 1)])
for i in range(n):
# Extract overlapping portions
start = i
end = min(i + len_b, len(a_padded))
if end > start:
a_window = a_padded[start:end]
b_window = b[: end - start]
# Only compute if we have a valid window
if len(a_window) > 0 and len(b_window) > 0:
# Normalize each window
a_mean = np.mean(a_window)
b_mean = np.mean(b_window)
a_centered = a_window - a_mean
b_centered = b_window - b_mean
# Compute normalized correlation
numerator = np.sum(a_centered * b_centered)
denominator = np.sqrt(np.sum(a_centered**2) * np.sum(b_centered**2))
if denominator > 1e-10:
result[i] = numerator / denominator
elif mode == "valid":
# For valid mode, all windows have the same size
if len_a >= len_b:
result = np.zeros(len_a - len_b + 1)
for i in range(len_a - len_b + 1):
a_window = a[i : i + len_b]
# Normalize each window
a_mean = np.mean(a_window)
b_mean = np.mean(b)
a_centered = a_window - a_mean
b_centered = b - b_mean
numerator = np.sum(a_centered * b_centered)
denominator = np.sqrt(np.sum(a_centered**2) * np.sum(b_centered**2))
if denominator > 1e-10:
result[i] = numerator / denominator
else:
result = np.zeros(1)
else: # mode == 'same'
# For same mode, return central part of full correlation
full_result = norm_cross_corr(a, b, mode="full")
# Extract the central part
if len_a >= len_b:
start_idx = (len_b - 1) // 2
result = full_result[start_idx : start_idx + len_a]
else:
start_idx = (len_a - 1) // 2
result = full_result[start_idx : start_idx + len_b]
return result
[docs]
def to_numpy_array(data):
"""Convert various data types to numpy array.
Handles numpy arrays, sparse matrices, and other array-like objects.
Warning: Converting large sparse matrices can cause memory issues.
Parameters
----------
data : array_like, sparse matrix, or any object
Input data to convert to numpy array. Can be:
- numpy array (returned as-is)
- scipy sparse matrix (converted to dense)
- list, tuple, or other array-like object
Returns
-------
numpy.ndarray
Dense numpy array representation of the input data.
Warnings
--------
Converting large sparse matrices to dense arrays can cause memory issues.
Consider the memory implications before converting sparse data.
Examples
--------
>>> import scipy.sparse as sp
>>> sparse_data = sp.csr_matrix([[1, 0, 0], [0, 2, 0]])
>>> dense = to_numpy_array(sparse_data)
>>> dense.toarray() if hasattr(dense, 'toarray') else dense
array([[1, 0, 0],
[0, 2, 0]])"""
if ssp.issparse(data):
return data.toarray()
else:
return np.asarray(data)
[docs]
def remove_outliers(data, method="zscore", threshold=3.0, quantile_range=(0.05, 0.95)):
"""Remove outliers from data using various detection strategies.
Parameters
----------
data : np.ndarray
1D array of data points.
method : str, default='zscore'
Outlier detection method:
- 'zscore': Remove points beyond threshold standard deviations from mean
- 'iqr': Interquartile range method (1.5*IQR beyond Q1/Q3)
- 'mad': Median absolute deviation method
- 'quantile': Remove points outside specified quantile range
- 'isolation': Local outlier factor based on density
threshold : float, default=3.0
Detection threshold:
- For 'zscore': number of standard deviations
- For 'iqr': multiplier for IQR (typically 1.5)
- For 'mad': number of median absolute deviations
- For 'isolation': contamination fraction (0-0.5)
quantile_range : tuple, default=(0.05, 0.95)
For 'quantile' method: (lower_quantile, upper_quantile)
Returns
-------
inlier_indices : np.ndarray
Indices of non-outlier points.
clean_data : np.ndarray
Data with outliers removed.
Raises
------
ValueError
If method is not recognized.
ImportError
If 'isolation' method is used but scikit-learn is not installed.
Notes
-----
- Input data is flattened with ravel().
- For constant data (zero variance), all points are considered inliers.
- MAD method uses scaling factor 1.4826 for consistency with normal distribution.
- Isolation method uses random_state=42 for reproducibility.
Examples
--------
>>> data = np.array([1, 2, 3, 100, 4, 5]) # 100 is outlier
>>> # Z-score method
>>> indices, cleaned = remove_outliers(data, method='zscore', threshold=2)
>>> cleaned
array([1, 2, 3, 4, 5])
>>> # IQR method
>>> indices, cleaned = remove_outliers(data, method='iqr', threshold=1.5)
>>> # Quantile method
>>> indices, cleaned = remove_outliers(data, method='quantile', quantile_range=(0.1, 0.9))"""
data = np.asarray(data).ravel()
n = len(data)
if method == "zscore":
# Classical z-score method
mean = np.mean(data)
std = np.std(data)
if std == 0:
# All values are identical
inlier_mask = np.ones(n, dtype=bool)
else:
z_scores = np.abs((data - mean) / std)
inlier_mask = z_scores < threshold
elif method == "iqr":
# Interquartile range method
q1 = np.percentile(data, 25)
q3 = np.percentile(data, 75)
iqr = q3 - q1
lower_bound = q1 - threshold * iqr
upper_bound = q3 + threshold * iqr
inlier_mask = (data >= lower_bound) & (data <= upper_bound)
elif method == "mad":
# Median Absolute Deviation method (robust to outliers)
median = np.median(data)
mad = np.median(np.abs(data - median))
# Scale MAD to be consistent with standard deviation
mad_scaled = 1.4826 * mad
if mad_scaled == 0:
# All values are identical
inlier_mask = np.ones(n, dtype=bool)
else:
modified_z_scores = np.abs((data - median) / mad_scaled)
inlier_mask = modified_z_scores < threshold
elif method == "quantile":
# Simple quantile-based method
lower_bound = np.percentile(data, quantile_range[0] * 100)
upper_bound = np.percentile(data, quantile_range[1] * 100)
inlier_mask = (data >= lower_bound) & (data <= upper_bound)
elif method == "isolation":
# Isolation Forest for anomaly detection
try:
from sklearn.ensemble import IsolationForest
except ImportError:
raise ImportError(
"IsolationForest requires scikit-learn. "
"Install it with: pip install scikit-learn"
)
# Reshape for sklearn
data_reshaped = data.reshape(-1, 1)
iso_forest = IsolationForest(
contamination=min(threshold, 0.5), random_state=42 # threshold is contamination rate
)
predictions = iso_forest.fit_predict(data_reshaped)
inlier_mask = predictions == 1
else:
raise ValueError(
f"Unknown outlier detection method: {method}. "
f"Choose from: 'zscore', 'iqr', 'mad', 'quantile', 'isolation'"
)
inlier_indices = np.where(inlier_mask)[0]
clean_data = data[inlier_mask]
return inlier_indices, clean_data
[docs]
def check_nonnegative(**kwargs):
"""Check that all provided parameters are non-negative.
Validates that numeric parameters are >= 0. Useful for input validation
in functions that require non-negative values (counts, rates, probabilities).
Parameters
----------
**kwargs
Parameter name to value mappings. All values should be numeric.
Raises
------
ValueError
If any parameter value is negative, NaN, or infinite.
Error message includes parameter name and value.
Examples
--------
>>> check_nonnegative(n_neurons=10, rate=0.5) # No error
>>> check_nonnegative(n_neurons=10, rate=-0.5)
Traceback (most recent call last):
...
ValueError: rate must be non-negative, got -0.5
>>> import numpy as np
>>> check_nonnegative(count=5, prob=np.nan)
Traceback (most recent call last):
...
ValueError: prob cannot be NaN"""
for name, value in kwargs.items():
if value is None:
continue # Skip None values
try:
val = float(value)
if np.isnan(val):
raise ValueError(f"{name} cannot be NaN")
if np.isinf(val):
raise ValueError(f"{name} cannot be infinite")
if val < 0:
raise ValueError(f"{name} must be non-negative, got {value}")
except (TypeError, ValueError) as e:
if "cannot be" in str(e) or "must be" in str(e):
raise # Re-raise our validation errors
raise TypeError(f"{name} must be numeric, got {type(value).__name__}")
[docs]
def check_unit(left_open=False, right_open=False, **kwargs):
"""Check that all provided parameters are in the unit interval.
Validates that numeric parameters are within [0, 1] with configurable
bound inclusion. Useful for probabilities, fractions, and normalized values.
Parameters
----------
left_open : bool, optional
If True, left bound is open (0, 1]. If False, closed [0, 1]. Default: False.
right_open : bool, optional
If True, right bound is open [0, 1). If False, closed [0, 1]. Default: False.
**kwargs
Parameter name to value mappings. All values should be numeric in [0, 1].
Raises
------
ValueError
If any parameter value is outside the unit interval, NaN, or infinite.
Error message includes parameter name, value, and expected range.
Examples
--------
>>> check_unit(probability=0.5, fraction=0.8) # No error
>>> check_unit(probability=1.5)
Traceback (most recent call last):
...
ValueError: probability must be in [0, 1], got 1.5
>>> check_unit(left_open=True, rate=0.0)
Traceback (most recent call last):
...
ValueError: rate must be in (0, 1], got 0.0
>>> check_unit(left_open=True, right_open=True, value=0.5) # No error"""
# Determine bounds description
left_bracket = "(" if left_open else "["
right_bracket = ")" if right_open else "]"
bounds_desc = f"{left_bracket}0, 1{right_bracket}"
for name, value in kwargs.items():
if value is None:
continue # Skip None values
try:
val = float(value)
if np.isnan(val):
raise ValueError(f"{name} cannot be NaN")
if np.isinf(val):
raise ValueError(f"{name} cannot be infinite")
# Check bounds
if left_open and val <= 0:
raise ValueError(f"{name} must be in {bounds_desc}, got {value}")
elif not left_open and val < 0:
raise ValueError(f"{name} must be in {bounds_desc}, got {value}")
if right_open and val >= 1:
raise ValueError(f"{name} must be in {bounds_desc}, got {value}")
elif not right_open and val > 1:
raise ValueError(f"{name} must be in {bounds_desc}, got {value}")
except (TypeError, ValueError) as e:
if "cannot be" in str(e) or "must be" in str(e):
raise # Re-raise our validation errors
raise TypeError(f"{name} must be numeric, got {type(value).__name__}")
[docs]
def check_positive(**kwargs):
"""Check that all provided parameters are positive (> 0).
Validates that numeric parameters are strictly positive. Useful for input
validation in functions that require positive values (dimensions, sizes).
Parameters
----------
**kwargs
Parameter name to value mappings. All values should be numeric.
Raises
------
ValueError
If any parameter value is not positive, NaN, or infinite.
Error message includes parameter name and value.
Examples
--------
>>> check_positive(n_neurons=10, dim=5) # No error
>>> check_positive(n_neurons=0)
Traceback (most recent call last):
...
ValueError: n_neurons must be positive, got 0
>>> check_positive(dim=-5)
Traceback (most recent call last):
...
ValueError: dim must be positive, got -5"""
for name, value in kwargs.items():
if value is None:
continue # Skip None values
try:
val = float(value)
if np.isnan(val):
raise ValueError(f"{name} cannot be NaN")
if np.isinf(val):
raise ValueError(f"{name} cannot be infinite")
if val <= 0:
raise ValueError(f"{name} must be positive, got {value}")
except (TypeError, ValueError) as e:
if "cannot be" in str(e) or "must be" in str(e):
raise # Re-raise our validation errors
raise TypeError(f"{name} must be numeric, got {type(value).__name__}")