"""
Spatial Analysis Utilities for Neural Data
==========================================
This module provides comprehensive spatial analysis tools for neural data,
particularly for analyzing place cells, grid cells, and other spatially-modulated neurons.
Key functionality:
- Place field detection and analysis
- Grid score computation
- Spatial information metrics
- Position decoding
- Speed/direction filtering
- High-level spatial analysis pipelines
"""
import numpy as np
from scipy import ndimage, signal
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score
from typing import Optional, Tuple, Dict, List, Union
import logging
try:
from ..information import TimeSeries, MultiTimeSeries, get_sim
from .data import check_positive
except ImportError:
# For standalone module execution (e.g., doctests)
from driada.information import TimeSeries, MultiTimeSeries, get_sim
from driada.utils.data import check_positive
[docs]
def compute_occupancy_map(
positions: np.ndarray,
fps: float = 1.0,
arena_bounds: Optional[Tuple[Tuple[float, float], Tuple[float, float]]] = None,
bin_size: float = 0.025,
min_occupancy: float = 0.1,
smooth_sigma: Optional[float] = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute 2D occupancy map from position data.
Parameters
----------
positions : np.ndarray, shape (n_samples, 2)
X, Y positions over time
fps : float
Frames per second (sampling frequency). Default 1.0 assumes
one frame per second.
arena_bounds : tuple of tuples, optional
((x_min, x_max), (y_min, y_max)). If None, inferred from data
bin_size : float
Size of spatial bins in same units as positions
min_occupancy : float
Minimum occupancy time (seconds) for valid bins
smooth_sigma : float, optional
Gaussian smoothing sigma in bins. If None, no smoothing
Returns
-------
occupancy_map : np.ndarray
2D occupancy map in seconds
x_edges : np.ndarray
X bin edges
y_edges : np.ndarray
Y bin edges
Raises
------
ValueError
If positions is not 2D or fps is non-positive
Examples
--------
>>> positions = np.random.rand(1000, 2) # 1000 frames
>>> # With 30 fps recording
>>> occ_map, x_edges, y_edges = compute_occupancy_map(positions, fps=30.0)
>>> # occ_map contains time in seconds per bin"""
if positions.shape[1] != 2:
raise ValueError(f"Positions must be 2D, got shape {positions.shape}")
check_positive(fps=fps)
# Determine arena bounds
if arena_bounds is None:
x_min, x_max = positions[:, 0].min(), positions[:, 0].max()
y_min, y_max = positions[:, 1].min(), positions[:, 1].max()
# Add small margin
margin = 0.05 * max(x_max - x_min, y_max - y_min)
arena_bounds = (
(x_min - margin, x_max + margin),
(y_min - margin, y_max + margin),
)
# Create bins
x_edges = np.arange(arena_bounds[0][0], arena_bounds[0][1] + bin_size, bin_size)
y_edges = np.arange(arena_bounds[1][0], arena_bounds[1][1] + bin_size, bin_size)
# Compute occupancy
occupancy, _, _ = np.histogram2d(positions[:, 0], positions[:, 1], bins=[x_edges, y_edges])
# Convert sample counts to time in seconds
occupancy = occupancy / fps
# Smooth if requested
if smooth_sigma is not None and smooth_sigma > 0:
occupancy = ndimage.gaussian_filter(occupancy, sigma=smooth_sigma)
# Set unvisited bins to NaN
occupancy[occupancy < min_occupancy] = np.nan
return occupancy.T, x_edges, y_edges # Transpose for correct orientation
[docs]
def compute_rate_map(
neural_signal: np.ndarray,
positions: np.ndarray,
occupancy_map: np.ndarray,
x_edges: np.ndarray,
y_edges: np.ndarray,
fps: float = 1.0,
smooth_sigma: Optional[float] = 1.5,
) -> np.ndarray:
"""
Compute firing rate map from neural signal and positions.
Parameters
----------
neural_signal : np.ndarray
Neural signal (e.g., calcium fluorescence, firing rates, spike counts)
positions : np.ndarray, shape (n_samples, 2)
X, Y positions corresponding to neural signal
occupancy_map : np.ndarray
2D occupancy map in seconds from compute_occupancy_map
x_edges : np.ndarray
X bin edges from compute_occupancy_map
y_edges : np.ndarray
Y bin edges from compute_occupancy_map
fps : float
Frames per second (sampling frequency). Must match the fps used
in compute_occupancy_map.
smooth_sigma : float, optional
Gaussian smoothing sigma in bins
Returns
-------
rate_map : np.ndarray
2D activity map (mean signal per spatial bin)
Raises
------
ValueError
If positions shape doesn't match neural_signal length or fps is non-positive
Examples
--------
>>> # Generate sample data
>>> import numpy as np
>>> positions = np.random.rand(1000, 2) # 1000 frames of 2D positions
>>> neural_signal = np.random.rand(1000) # Corresponding neural activity
>>> # Compute occupancy first
>>> occ_map, x_edges, y_edges = compute_occupancy_map(positions, fps=30.0)
>>> # Then compute rate map
>>> rate_map = compute_rate_map(neural_signal, positions, occ_map,
... x_edges, y_edges, fps=30.0)"""
# Validate inputs
if len(neural_signal) != len(positions):
raise ValueError(
f"neural_signal length ({len(neural_signal)}) must match "
f"positions length ({len(positions)})"
)
check_positive(fps=fps)
# For continuous signals (e.g., calcium), compute mean activity per bin
# Use weighted 2D histogram to sum activity in each bin
activity_sum, _, _ = np.histogram2d(
positions[:, 0],
positions[:, 1],
bins=[x_edges, y_edges],
weights=neural_signal, # Use signal as weights
)
activity_sum = activity_sum.T # Transpose for correct orientation
# Convert occupancy from seconds back to sample counts for rate calculation
# This ensures we use the same bins and smoothing as the occupancy map
occupancy_counts = occupancy_map * fps
# Compute mean activity per bin
with np.errstate(divide="ignore", invalid="ignore"):
rate_map = activity_sum / occupancy_counts
rate_map[occupancy_counts == 0] = 0 # Set unvisited bins to 0
rate_map[np.isnan(occupancy_map)] = 0 # Respect occupancy NaN mask
# Smooth if requested
if smooth_sigma is not None and smooth_sigma > 0:
# Only smooth visited bins
mask = occupancy_counts > 0
if np.any(mask):
# Create a smoothed version preserving only visited areas
smoothed = ndimage.gaussian_filter(rate_map, sigma=smooth_sigma)
smoothed_mask = ndimage.gaussian_filter(mask.astype(float), sigma=smooth_sigma)
with np.errstate(divide="ignore", invalid="ignore"):
rate_map = smoothed / smoothed_mask
rate_map[~mask] = 0
return rate_map
[docs]
def compute_spatial_decoding_accuracy(
neural_activity: np.ndarray,
positions: np.ndarray,
test_size: float = 0.5,
n_estimators: int = 20,
max_depth: int = 3,
min_samples_leaf: int = 50,
random_state: int = 42,
logger: Optional[logging.Logger] = None,
) -> Dict[str, float]:
"""
Compute position decoding accuracy from neural activity.
Parameters
----------
neural_activity : np.ndarray, shape (n_neurons, n_samples)
Neural activity matrix
positions : np.ndarray, shape (n_samples, 2)
True X, Y positions
test_size : float
Fraction of data for testing
n_estimators : int
Number of trees in random forest
max_depth : int
Maximum tree depth
min_samples_leaf : int
Minimum samples per leaf
random_state : int
Random seed for reproducibility
logger : logging.Logger, optional
Logger for debugging
Returns
-------
metrics : dict
Decoding accuracy metrics:
- r2_x: R² score for X position
- r2_y: R² score for Y position
- r2_avg: Average R² score
- mse: Mean squared error
Raises
------
ValueError
If shape mismatch between neural_activity and positions
Notes
-----
Forces non-negative R² scores. Uses all CPU cores.
Examples
--------
>>> import numpy as np
>>> # Create sample data: 10 neurons, 1000 time points
>>> neural_data = np.random.rand(10, 1000)
>>> positions = np.random.rand(1000, 2)
>>> metrics = compute_spatial_decoding_accuracy(neural_data, positions)
>>> print(f"Decoding R²: {metrics['r2_avg']:.3f}") # doctest: +ELLIPSIS
Decoding R²: ...
"""
# Validate inputs
if neural_activity.shape[1] != positions.shape[0]:
raise ValueError(
f"Shape mismatch: neural_activity has {neural_activity.shape[1]} "
f"samples but positions has {positions.shape[0]}"
)
if positions.shape[1] != 2:
raise ValueError(f"Positions must be 2D, got shape {positions.shape}")
check_positive(
test_size=test_size, n_estimators=n_estimators, min_samples_leaf=min_samples_leaf
)
if logger:
logger.info(f"Computing spatial decoding with {neural_activity.shape[0]} neurons")
# Prepare data (transpose for sklearn format)
X = neural_activity.T # (n_samples, n_neurons)
y = positions
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
# Train decoder
decoder = RandomForestRegressor(
n_estimators=n_estimators,
max_depth=max_depth,
min_samples_leaf=min_samples_leaf,
random_state=random_state,
n_jobs=-1,
)
decoder.fit(X_train, y_train)
y_pred = decoder.predict(X_test)
# Compute metrics
r2_x = r2_score(y_test[:, 0], y_pred[:, 0])
r2_y = r2_score(y_test[:, 1], y_pred[:, 1])
mse = np.mean((y_test - y_pred) ** 2)
metrics = {
"r2_x": max(0.0, r2_x), # Avoid negative R²
"r2_y": max(0.0, r2_y),
"r2_avg": max(0.0, (r2_x + r2_y) / 2),
"mse": mse,
}
if logger:
logger.info(f"Decoding accuracy: R²_avg = {metrics['r2_avg']:.3f}")
return metrics
[docs]
def compute_spatial_metrics(
neural_activity: np.ndarray,
positions: np.ndarray,
metrics: Optional[List[str]] = None,
**kwargs,
) -> Dict[str, Union[float, Dict]]:
"""
Compute selected spatial metrics (decoding and information only).
.. note::
Place field detection (extract_place_fields) is experimental
and not included in this function. Use INTENSE for principled
place cell detection.
Parameters
----------
neural_activity : np.ndarray, shape (n_neurons, n_samples)
Neural activity data
positions : np.ndarray, shape (n_samples, 2)
Position data
metrics : list of str, optional
Metrics to compute. If None, computes all.
Options: 'decoding', 'information'
(Removed: 'place_fields' - use INTENSE instead)
**kwargs
Additional arguments passed to analysis functions
(e.g., fps, logger, test_size)
Returns
-------
results : dict
Computed metrics based on requested analyses
Raises
------
ValueError
If invalid metric name provided or 'place_fields' requested
Examples
--------
>>> # Compute only decoding accuracy
>>> import numpy as np
>>> neural_data = np.random.rand(15, 2000) # 15 neurons, 2000 samples
>>> positions = np.random.rand(2000, 2)
>>> results = compute_spatial_metrics(neural_data, positions,
... metrics=['decoding'])
>>> # Compute all metrics
>>> import numpy as np
>>> neural_data = np.random.rand(15, 2000)
>>> positions = np.random.rand(2000, 2)
>>> results = compute_spatial_metrics(neural_data, positions)"""
if metrics is None:
metrics = ["decoding", "information"]
# Validate metrics
valid_metrics = {"decoding", "information"}
invalid = set(metrics) - valid_metrics
if invalid:
# Special error for place_fields
if "place_fields" in invalid:
raise ValueError(
"Place field detection removed from compute_spatial_metrics. "
"Use INTENSE for MI-based place cell detection."
)
raise ValueError(f"Invalid metrics: {invalid}. Valid options: {valid_metrics}")
results = {}
if "decoding" in metrics:
results["decoding"] = compute_spatial_decoding_accuracy(
neural_activity, positions, **kwargs
)
if "information" in metrics:
results["information"] = compute_spatial_information(neural_activity, positions, **kwargs)
return results