"""
Core RSA functions for computing and comparing RDMs.
"""
from __future__ import annotations
import numpy as np
import logging
from typing import Dict, Tuple, Optional, Union, TYPE_CHECKING
from scipy import stats
from scipy.spatial.distance import pdist, squareform
from ..dim_reduction.data import MVData
from ..utils.jit import is_jit_enabled
from .core_jit import (
fast_euclidean_distance,
fast_manhattan_distance,
fast_average_patterns,
)
if TYPE_CHECKING:
from ..experiment import Experiment
[docs]
def compute_rdm(
patterns: Union[np.ndarray, MVData],
metric: str = "correlation",
logger: Optional[logging.Logger] = None,
) -> np.ndarray:
"""
Compute representational dissimilarity matrix from patterns.
Parameters
----------
patterns : np.ndarray or MVData
Pattern matrix of shape (n_items, n_features) if np.ndarray (will be transposed automatically)
or MVData object
Each row is a pattern/item, each column is a feature
metric : str, default 'correlation'
Distance metric: 'correlation', 'euclidean', 'cosine', 'manhattan'
logger : logging.Logger, optional
Logger instance for debugging (currently unused)
Returns
-------
rdm : np.ndarray
Representational dissimilarity matrix (n_items, n_items)"""
# Convert to MVData if needed
if isinstance(patterns, np.ndarray):
# MVData expects (n_features, n_items) so transpose
# RSA doesn't require non-zero columns since we're computing distances
mvdata = MVData(patterns.T, allow_zero_columns=True)
else:
mvdata = patterns
# Check if we can use JIT-compiled functions for raw numpy arrays
if (
isinstance(patterns, np.ndarray)
and is_jit_enabled()
and metric in ["euclidean", "manhattan"]
):
# Use JIT only for euclidean and manhattan where it's simpler
if metric == "euclidean":
rdm = fast_euclidean_distance(patterns)
elif metric == "manhattan":
rdm = fast_manhattan_distance(patterns)
elif isinstance(patterns, np.ndarray):
# Use scipy for numpy arrays
distances = pdist(patterns, metric=metric)
rdm = squareform(distances)
else:
# Use MVData's built-in methods
if metric == "correlation":
# We need correlation between items (columns), so use axis=1
corr_mat = mvdata.corr_mat(axis=1)
rdm = 1 - corr_mat
else:
# get_distmat computes distances between columns (items)
rdm = mvdata.get_distmat(metric)
# Ensure diagonal is zero
np.fill_diagonal(rdm, 0)
# Ensure no negative values due to numerical errors
rdm = np.maximum(rdm, 0)
# Check for NaN or inf values
if np.any(np.isnan(rdm)) or np.any(np.isinf(rdm)):
import warnings
warnings.warn(
"RDM contains NaN or infinite values. This may indicate "
"constant features or numerical instability.",
RuntimeWarning,
)
return rdm
[docs]
def compute_rdm_from_timeseries_labels(
data: np.ndarray,
labels: np.ndarray,
metric: str = "correlation",
average_method: str = "mean",
) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute RDM from time series data using behavioral variable labels.
Parameters
----------
data : np.ndarray
Data array of shape (n_features, n_timepoints)
labels : np.ndarray
Label for each timepoint, shape (n_timepoints,)
Each unique label defines a condition/item
metric : str, default 'correlation'
Distance metric for RDM computation
average_method : str, default 'mean'
How to average within conditions: 'mean' or 'median'
Returns
-------
rdm : np.ndarray
Representational dissimilarity matrix
unique_labels : np.ndarray
The unique labels in order as they appear in the RDM"""
# Get unique labels (conditions)
unique_labels = np.unique(labels)
# Use JIT-compiled averaging if available and using mean
if is_jit_enabled() and average_method == "mean":
pattern_matrix = fast_average_patterns(data, labels, unique_labels)
else:
# Extract patterns for each condition
patterns = []
for label in unique_labels:
mask = labels == label
condition_data = data[:, mask]
if average_method == "mean":
pattern = np.mean(condition_data, axis=1)
elif average_method == "median":
pattern = np.median(condition_data, axis=1)
else:
raise ValueError(f"Unknown average method: {average_method}")
patterns.append(pattern)
# Stack into pattern matrix (n_conditions, n_features)
pattern_matrix = np.array(patterns)
# Compute RDM
rdm = compute_rdm(pattern_matrix, metric=metric)
return rdm, unique_labels
[docs]
def compute_rdm_from_trials(
data: np.ndarray,
trial_starts: np.ndarray,
trial_labels: np.ndarray,
trial_duration: Optional[int] = None,
metric: str = "correlation",
average_method: str = "mean",
) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute RDM from time series data using explicit trial structure.
Parameters
----------
data : np.ndarray
Data array of shape (n_features, n_timepoints)
trial_starts : np.ndarray
Start indices for each trial
trial_labels : np.ndarray
Label for each trial (same length as trial_starts)
trial_duration : int, optional
Fixed duration for each trial. If None, uses time until next trial
metric : str, default 'correlation'
Distance metric for RDM computation
average_method : str, default 'mean'
How to average within trial: 'mean' or 'median'
Returns
-------
rdm : np.ndarray
Representational dissimilarity matrix
unique_labels : np.ndarray
The unique trial labels in order as they appear in the RDM"""
# Check if trial_starts are sorted
if len(trial_starts) > 1 and not np.all(np.diff(trial_starts) > 0):
raise ValueError("trial_starts must be sorted in ascending order")
n_timepoints = data.shape[1]
n_trials = len(trial_starts)
# Extract data for each trial
trial_patterns = []
trial_labels_list = []
for i, (start, label) in enumerate(zip(trial_starts, trial_labels)):
# Validate trial start
if start < 0 or start >= n_timepoints:
raise ValueError(f"Trial {i} start index {start} is out of bounds [0, {n_timepoints})")
# Determine trial end
if trial_duration is not None:
end = min(start + trial_duration, n_timepoints)
else:
# Use next trial start or end of data
if i < n_trials - 1:
end = trial_starts[i + 1]
# Validate next trial start
if end < 0 or end > n_timepoints:
raise ValueError(
f"Trial {i+1} start index {end} is out of bounds [0, {n_timepoints}]"
)
else:
end = n_timepoints
# Extract trial data
trial_data = data[:, start:end]
if trial_data.shape[1] > 0: # Only process non-empty trials
if average_method == "mean":
pattern = np.mean(trial_data, axis=1)
elif average_method == "median":
pattern = np.median(trial_data, axis=1)
else:
raise ValueError(f"Unknown average method: {average_method}")
trial_patterns.append(pattern)
trial_labels_list.append(label)
# Get unique labels and average patterns for each condition
unique_labels = np.unique(trial_labels_list)
condition_patterns = []
for label in unique_labels:
# Find all patterns for this label
label_patterns = [
pattern for pattern, l in zip(trial_patterns, trial_labels_list) if l == label
]
# Average across repetitions of the same condition
if len(label_patterns) > 0:
if average_method == "mean":
avg_pattern = np.mean(label_patterns, axis=0)
elif average_method == "median":
avg_pattern = np.median(label_patterns, axis=0)
else:
# This should never happen due to earlier validation
raise ValueError(f"Unknown average method: {average_method}")
condition_patterns.append(avg_pattern)
# Stack into pattern matrix
pattern_matrix = np.array(condition_patterns)
# Compute RDM
rdm = compute_rdm(pattern_matrix, metric=metric)
return rdm, unique_labels
[docs]
def compare_rdms(rdm1: np.ndarray, rdm2: np.ndarray, method: str = "spearman") -> float:
"""
Compare two representational dissimilarity matrices.
Quantifies the similarity between two RDMs using correlation or
cosine similarity metrics. Only the upper triangular portion
(excluding diagonal) is compared since RDMs are symmetric.
Parameters
----------
rdm1 : np.ndarray
First RDM, square symmetric matrix of shape (n_items, n_items).
rdm2 : np.ndarray
Second RDM, must have the same shape as rdm1.
method : str, default 'spearman'
Comparison method:
- 'spearman': Spearman rank correlation (robust to monotonic transforms)
- 'pearson': Pearson correlation (assumes linear relationship)
- 'kendall': Kendall's tau (robust but slower, O(n²) complexity)
- 'cosine': Cosine similarity (angle between RDM vectors)
Returns
-------
float
Similarity score between RDMs:
- Correlations ('spearman', 'pearson', 'kendall'): Range [-1, 1]
- Cosine similarity: Range [0, 1] (NaN if either RDM has zero norm)
- Returns NaN if correlation cannot be computed (e.g., constant RDMs)
Raises
------
ValueError
If RDMs have different shapes.
If method is not one of the supported options.
RuntimeWarning
If either RDM contains NaN values (via warnings.warn).
Notes
-----
Only upper triangular values are used since RDMs are symmetric
and diagonal is uninformative (always 0).
P-values from statistical tests are computed internally but not
returned. Use bootstrap_rdm_comparison for statistical inference.
Kendall's tau is more robust than Spearman but has O(n²) complexity
in the number of unique values, making it slow for large RDMs.
For cosine similarity, if either RDM vector has zero norm (all values
identical), the function returns NaN and issues a warning.
Examples
--------
>>> import numpy as np
>>> # Create two similar RDMs
>>> rdm1 = np.array([[0, 0.5, 0.8], [0.5, 0, 0.3], [0.8, 0.3, 0]])
>>> rdm2 = np.array([[0, 0.6, 0.7], [0.6, 0, 0.4], [0.7, 0.4, 0]])
>>> # Compare using different methods
>>> pearson_sim = compare_rdms(rdm1, rdm2, method='pearson')
>>> print(f"Pearson correlation: {pearson_sim:.3f}")
Pearson correlation: 0.954
>>> spearman_sim = compare_rdms(rdm1, rdm2, method='spearman')
>>> print(f"Spearman correlation: {spearman_sim:.3f}")
Spearman correlation: 1.000
>>> # Cosine similarity
>>> cosine_sim = compare_rdms(rdm1, rdm2, method='cosine')
>>> print(f"Cosine similarity: {cosine_sim:.3f}")
Cosine similarity: 0.985
See Also
--------
compute_rdm : Compute RDMs from neural patterns
bootstrap_rdm_comparison : Statistical comparison with confidence intervals
rsa_compare : High-level interface for comparing datasets"""
# Ensure RDMs are same shape
if rdm1.shape != rdm2.shape:
raise ValueError(f"RDMs must have the same shape. Got {rdm1.shape} and {rdm2.shape}")
# Extract upper triangular parts (excluding diagonal)
mask = np.triu(np.ones_like(rdm1, dtype=bool), k=1)
rdm1_vec = rdm1[mask]
rdm2_vec = rdm2[mask]
# Check for NaN values
if np.any(np.isnan(rdm1_vec)) or np.any(np.isnan(rdm2_vec)):
import warnings
warnings.warn("RDMs contain NaN values. Correlation may return NaN.", RuntimeWarning)
# Compute similarity
if method == "spearman":
similarity, _ = stats.spearmanr(rdm1_vec, rdm2_vec)
elif method == "pearson":
similarity, _ = stats.pearsonr(rdm1_vec, rdm2_vec)
elif method == "kendall":
similarity, _ = stats.kendalltau(rdm1_vec, rdm2_vec)
elif method == "cosine":
# Cosine similarity
dot_product = np.dot(rdm1_vec, rdm2_vec)
norm1 = np.linalg.norm(rdm1_vec)
norm2 = np.linalg.norm(rdm2_vec)
if norm1 == 0 or norm2 == 0:
import warnings
warnings.warn(
"One or both RDMs have zero norm (all values identical). "
"Cosine similarity is undefined, returning NaN.",
RuntimeWarning,
)
similarity = np.nan
else:
similarity = dot_product / (norm1 * norm2)
else:
raise ValueError(f"Unknown comparison method: {method}")
return similarity
[docs]
def bootstrap_rdm_comparison(
data1: np.ndarray,
data2: np.ndarray,
labels1: np.ndarray,
labels2: np.ndarray,
metric: str = "correlation",
comparison_method: str = "spearman",
n_bootstrap: int = 1000,
random_state: Optional[int] = None,
) -> Dict[str, Union[float, np.ndarray]]:
"""
Bootstrap test for RDM similarity between two datasets.
Performs statistical inference on RDM similarity using within-condition
resampling. This maintains the experimental design while estimating
confidence intervals and assessing reliability of the similarity.
Parameters
----------
data1 : np.ndarray
First dataset of shape (n_features1, n_timepoints). Features can
be different between datasets (e.g., comparing V1 vs V2 neurons).
data2 : np.ndarray
Second dataset of shape (n_features2, n_timepoints). Must have
the same number of timepoints as data1.
labels1 : np.ndarray
Condition labels for each timepoint in data1, shape (n_timepoints,).
labels2 : np.ndarray
Condition labels for each timepoint in data2, shape (n_timepoints,).
Must contain the same unique values as labels1.
metric : str, default 'correlation'
Distance metric for RDM computation. See compute_rdm for options.
comparison_method : str, default 'spearman'
Method for comparing RDMs. See compare_rdms for options.
n_bootstrap : int, default 1000
Number of bootstrap iterations. Higher values give more stable
estimates but take longer.
random_state : int, optional
Random seed for reproducibility. Creates a local RandomState
to avoid affecting global random state.
Returns
-------
dict
Dictionary containing:
- 'observed': float, observed RDM similarity between datasets
- 'bootstrap_distribution': np.ndarray, bootstrap similarity values
- 'p_value': float, two-tailed test of observed vs bootstrap mean
- 'ci_lower': float, 2.5th percentile of bootstrap distribution
- 'ci_upper': float, 97.5th percentile of bootstrap distribution
- 'mean': float, mean of bootstrap distribution
- 'std': float, standard deviation of bootstrap distribution
Raises
------
ValueError
If datasets don't have the same unique condition labels.
Notes
-----
The bootstrap procedure:
1. Resamples timepoints within each condition independently
2. Maintains the number of samples per condition
3. Computes RDMs from resampled data
4. Calculates similarity between resampled RDMs
This within-condition resampling preserves the experimental design
while capturing trial-by-trial variability.
The p-value tests whether the observed similarity is extreme
relative to the bootstrap distribution mean. This is NOT a
standard null hypothesis test but rather a stability assessment.
Uses a local RandomState to avoid modifying global numpy random
state, ensuring thread safety and reproducibility.
Examples
--------
>>> import numpy as np
>>> # Create two datasets with different patterns but similar structure
>>> np.random.seed(42)
>>> n_features = 20
>>> n_timepoints = 90
>>>
>>> # 3 conditions, 30 samples each
>>> labels = np.repeat(['A', 'B', 'C'], 30)
>>>
>>> # Create data with condition-specific patterns
>>> v1_data = np.random.randn(n_features, n_timepoints)
>>> v2_data = np.random.randn(n_features, n_timepoints)
>>>
>>> # Bootstrap will resample and compute similarity distribution
>>> results = bootstrap_rdm_comparison(
... v1_data, v2_data, labels, labels,
... n_bootstrap=50, random_state=42
... )
>>>
>>> # Check that results contain expected keys
>>> print('Keys:', sorted(results.keys()))
Keys: ['bootstrap_distribution', 'ci_lower', 'ci_upper', 'mean', 'observed', 'p_value', 'std']
>>>
>>> # Random data should give low correlation
>>> print(f"Observed between -1 and 1: {-1 <= results['observed'] <= 1}")
Observed between -1 and 1: True
See Also
--------
compare_rdms : Direct RDM comparison without bootstrap
compute_rdm_from_timeseries_labels : Compute RDM from labeled data
rsa_compare : High-level interface with multiple data types"""
# Create a local random number generator to avoid modifying global state
if random_state is not None:
rng = np.random.RandomState(random_state)
else:
rng = np.random.RandomState()
# Compute observed RDM similarity
rdm1, _ = compute_rdm_from_timeseries_labels(data1, labels1, metric=metric)
rdm2, _ = compute_rdm_from_timeseries_labels(data2, labels2, metric=metric)
observed_similarity = compare_rdms(rdm1, rdm2, method=comparison_method)
# Get unique conditions
unique_conditions1 = np.unique(labels1)
unique_conditions2 = np.unique(labels2)
# Check that both datasets have same conditions
if not np.array_equal(unique_conditions1, unique_conditions2):
raise ValueError("Both datasets must have the same set of condition labels")
# Bootstrap with within-condition resampling
bootstrap_similarities = []
for _ in range(n_bootstrap):
# Resample within each condition to maintain balance
idx1 = []
idx2 = []
for condition in unique_conditions1:
# Get indices for this condition
cond_idx1 = np.where(labels1 == condition)[0]
cond_idx2 = np.where(labels2 == condition)[0]
# Resample with replacement within condition
n_samples1 = len(cond_idx1)
n_samples2 = len(cond_idx2)
resampled_idx1 = rng.choice(cond_idx1, size=n_samples1, replace=True)
resampled_idx2 = rng.choice(cond_idx2, size=n_samples2, replace=True)
idx1.extend(resampled_idx1)
idx2.extend(resampled_idx2)
# Convert to arrays and shuffle to mix conditions
idx1 = np.array(idx1)
idx2 = np.array(idx2)
rng.shuffle(idx1)
rng.shuffle(idx2)
# Compute RDMs on resampled data
rdm1_boot, _ = compute_rdm_from_timeseries_labels(
data1[:, idx1], labels1[idx1], metric=metric
)
rdm2_boot, _ = compute_rdm_from_timeseries_labels(
data2[:, idx2], labels2[idx2], metric=metric
)
# Compare
sim_boot = compare_rdms(rdm1_boot, rdm2_boot, method=comparison_method)
bootstrap_similarities.append(sim_boot)
bootstrap_similarities = np.array(bootstrap_similarities)
# Compute statistics
mean_similarity = np.mean(bootstrap_similarities)
std_similarity = np.std(bootstrap_similarities)
# Compute p-value (two-tailed) - tests if observed is extreme relative to bootstrap mean
p_value = np.mean(
np.abs(bootstrap_similarities - mean_similarity)
>= np.abs(observed_similarity - mean_similarity)
)
# Compute confidence intervals
ci_lower = np.percentile(bootstrap_similarities, 2.5)
ci_upper = np.percentile(bootstrap_similarities, 97.5)
return {
"observed": observed_similarity,
"bootstrap_distribution": bootstrap_similarities,
"p_value": p_value,
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"mean": mean_similarity,
"std": std_similarity,
}
# Unified API function
[docs]
def compute_rdm_unified(
data: Union[np.ndarray, MVData, Experiment],
items: Optional[Union[np.ndarray, str, Dict]] = None,
data_type: str = "calcium",
metric: str = "correlation",
average_method: str = "mean",
trial_duration: Optional[int] = None,
) -> Tuple[np.ndarray, Optional[np.ndarray]]:
"""
Compute RDM with automatic data type detection and dispatching.
This function intelligently dispatches to the appropriate RDM computation
method based on the input data type and items specification. It provides
a unified interface for computing RDMs from various data structures
(arrays, MVData, Experiments) and item definitions (pre-averaged patterns,
timeseries labels, or trial structures).
Parameters
----------
data : np.ndarray, MVData, or Experiment
The data to compute RDM from:
- np.ndarray: Raw data matrix (n_features, n_timepoints)
- MVData: MVData object
- Experiment: DRIADA Experiment object
items : np.ndarray, str, dict, or None
How to define items/conditions:
- None: Compute RDM directly from patterns (requires pre-averaged data)
- np.ndarray: Condition labels for each timepoint
- str: Name of dynamic feature (for Experiment objects)
- dict: Trial structure with 'trial_starts' and 'trial_labels'
data_type : str, default 'calcium'
For Experiment objects, which data type to use ('calcium' or 'spikes')
metric : str, default 'correlation'
Distance metric for RDM computation. Options: 'correlation',
'euclidean', 'cosine', 'manhattan'
average_method : str, default 'mean'
How to average within conditions ('mean' or 'median')
trial_duration : int, optional
For trial structure, fixed duration for each trial. If specified
in both parameter and items dict, dict value takes precedence.
Returns
-------
rdm : np.ndarray
Representational dissimilarity matrix
labels : np.ndarray or None
The unique labels/conditions if items were specified
Raises
------
ValueError
If items required but not provided for Experiment objects.
If trial structure dict missing required keys.
If metric is not one of the valid options.
If MVData/Embedding used with trial structure.
Notes
-----
Imports are performed inside the function to avoid circular
dependencies. This has minimal performance impact as the function
is typically called only a few times per analysis.
When trial_duration is specified in both the items dict and as a
parameter, the dict value takes precedence and a warning is issued.
Examples
--------
>>> import numpy as np
>>> # Direct pattern RDM (pre-averaged data)
>>> patterns = np.random.randn(10, 50) # 10 items, 50 features
>>> rdm, _ = compute_rdm_unified(patterns)
>>> print(f"RDM shape: {rdm.shape}")
RDM shape: (10, 10)
>>> # From time series with labels
>>> data = np.random.randn(100, 1000) # 100 features, 1000 timepoints
>>> labels = np.repeat([0, 1, 2, 3], 250)
>>> rdm, unique_labels = compute_rdm_unified(data, labels)
>>> print(f"RDM shape: {rdm.shape}, unique labels: {unique_labels}")
RDM shape: (4, 4), unique labels: [0 1 2 3]
>>> # From MVData object
>>> from driada.dim_reduction.data import MVData
>>> mvdata = MVData(np.random.randn(50, 100)) # 50 features, 100 samples
>>> rdm, _ = compute_rdm_unified(mvdata)
>>> print(f"RDM shape: {rdm.shape}")
RDM shape: (100, 100)
See Also
--------
compute_rdm : Direct RDM computation from patterns
compute_rdm_from_timeseries_labels : RDM from labeled timeseries
compute_rdm_from_trials : RDM from trial structure
~driada.rsa.integration.compute_experiment_rdm : RDM from Experiment objects"""
# Import here to avoid circular dependency
from ..experiment import Experiment
from ..dim_reduction.embedding import Embedding
from .integration import compute_experiment_rdm, compute_mvdata_rdm
# Handle Embedding objects
if isinstance(data, Embedding):
# Convert embedding to MVData and process
mvdata = data.to_mvdata()
if items is None:
# Direct pattern RDM - embedding already represents patterns
return compute_rdm(mvdata, metric=metric), None
else:
# Items specified - use MVData processing
if isinstance(items, dict):
raise ValueError(
"Trial structure not supported for Embedding objects. Use labels array instead."
)
return compute_mvdata_rdm(mvdata, items, metric=metric, average_method=average_method)
# Handle Experiment objects
elif isinstance(data, Experiment):
if items is None:
raise ValueError("items must be specified for Experiment objects")
return compute_experiment_rdm(
data,
items,
data_type=data_type,
metric=metric,
average_method=average_method,
)
# Handle MVData objects
elif isinstance(data, MVData):
if items is None:
# Direct pattern RDM - assume data is already averaged
# MVData stores as (n_features, n_items)
return compute_rdm(data, metric=metric), None
else:
if isinstance(items, dict):
# Trial structure not directly supported for MVData
raise ValueError(
"Trial structure not supported for MVData. Use labels array instead."
)
# Items should be labels array
return compute_mvdata_rdm(data, items, metric=metric, average_method=average_method)
# Handle numpy arrays
else:
# Validate metric
valid_metrics = ["correlation", "euclidean", "cosine", "manhattan"]
if metric not in valid_metrics:
raise ValueError(f"Invalid metric '{metric}'. Must be one of {valid_metrics}")
if items is None:
# Direct pattern RDM - assume rows are patterns
return compute_rdm(data, metric=metric), None
elif isinstance(items, dict):
# Trial structure
# Handle trial_duration conflict
if "trial_duration" in items and trial_duration is not None:
import warnings
warnings.warn(
"trial_duration specified in both items dict and parameter. "
"Using value from items dict.",
UserWarning,
)
return compute_rdm_from_trials(
data,
trial_starts=items["trial_starts"],
trial_labels=items["trial_labels"],
trial_duration=items.get("trial_duration", trial_duration),
metric=metric,
average_method=average_method,
)
else:
# Labels array
return compute_rdm_from_timeseries_labels(
data, items, metric=metric, average_method=average_method
)
# Simplified high-level API for common use case
[docs]
def rsa_compare(
data1: Union[np.ndarray, MVData, Experiment],
data2: Union[np.ndarray, MVData, Experiment],
items: Optional[Union[str, Dict]] = None,
metric: str = "correlation",
comparison: str = "spearman",
data_type: str = "calcium",
logger: Optional[logging.Logger] = None,
) -> float:
"""
Compare neural representations using RSA.
This is a simplified API for the most common RSA use case: comparing
two sets of neural representations. It automatically handles different
data types (arrays, MVData, Embeddings, Experiments) and computes the
similarity between their representational geometries.
Parameters
----------
data1 : np.ndarray, MVData, or Experiment
First dataset (n_items, n_features) if array, MVData object, or Experiment
data2 : np.ndarray, MVData, or Experiment
Second dataset (same n_items as data1)
items : str, dict, or None
How to define conditions (required for Experiment objects):
- None: For arrays/MVData, assumes data is already averaged per item
- str: Name of dynamic feature (e.g., 'stimulus_type')
- dict: Trial structure with 'trial_starts' and 'trial_labels'
metric : str, default 'correlation'
Distance metric for RDM computation
comparison : str, default 'spearman'
Method for comparing RDMs ('spearman', 'pearson', 'kendall', 'cosine')
data_type : str, default 'calcium'
For Experiment objects, which data to use ('calcium' or 'spikes')
logger : logging.Logger, optional
Logger for debugging messages
Returns
-------
similarity : float
Similarity score between the two neural representations.
Range depends on comparison method: [-1, 1] for correlations,
[0, 1] for cosine similarity.
Raises
------
ValueError
If items not specified for Experiment objects.
If trying to compare Experiment with non-Experiment data.
If RDMs have incompatible shapes (different numbers of items).
Notes
-----
Imports are performed inside the function to avoid circular
dependencies. Embedding objects are automatically converted to
MVData for uniform processing.
When comparing arrays or MVData without items specification,
assumes the data is already averaged per condition (each row
represents one item/condition).
Examples
--------
>>> import numpy as np
>>> # Compare two brain areas with structured data
>>> np.random.seed(42)
>>> n_stimuli = 5
>>> n_neurons_v1, n_neurons_v2 = 20, 15
>>>
>>> # Create orthogonal patterns for each stimulus
>>> v1_data = np.zeros((n_stimuli, n_neurons_v1))
>>> v2_data = np.zeros((n_stimuli, n_neurons_v2))
>>>
>>> # Each stimulus activates different neurons in both areas
>>> for i in range(n_stimuli):
... # V1: each stimulus activates 4 specific neurons
... v1_data[i, i*4:(i+1)*4] = 1.0
... # V2: similar pattern with 3 neurons per stimulus
... v2_data[i, i*3:(i+1)*3] = 1.0
>>>
>>> # Add small noise for realism
>>> v1_data += 0.1 * np.random.randn(n_stimuli, n_neurons_v1)
>>> v2_data += 0.1 * np.random.randn(n_stimuli, n_neurons_v2)
>>>
>>> # This creates similar RDM structure in both areas
>>> similarity = rsa_compare(v1_data, v2_data, comparison='spearman')
>>> print(f"RSA similarity: {similarity:.3f}")
RSA similarity: 0.479
>>> # Compare using compute_rdm_unified first
>>> from driada.rsa import compute_rdm_unified
>>> np.random.seed(123)
>>> data1 = np.random.randn(50, 90) # 50 features, 90 timepoints
>>> data2 = np.random.randn(40, 90) # 40 features, same timepoints
>>> labels = np.repeat(['A', 'B', 'C'], 30) # 30 samples per condition
>>> # First compute RDMs with labels
>>> rdm1, _ = compute_rdm_unified(data1, items=labels)
>>> rdm2, _ = compute_rdm_unified(data2, items=labels)
>>> # Both RDMs now have shape (3, 3) for the 3 conditions
>>> print(f"RDM shapes: {rdm1.shape}, {rdm2.shape}")
RDM shapes: (3, 3), (3, 3)
>>> # Compare the RDMs
>>> from driada.rsa import compare_rdms
>>> similarity = compare_rdms(rdm1, rdm2)
>>> print(f"RSA similarity between -1 and 1: {-1 <= similarity <= 1}")
RSA similarity between -1 and 1: True
See Also
--------
compute_rdm_unified : Unified RDM computation interface
compare_rdms : Direct comparison of RDM matrices
bootstrap_rdm_comparison : Statistical comparison with confidence intervals"""
# Import here to avoid circular dependency
from ..experiment import Experiment
from ..dim_reduction.embedding import Embedding
from .integration import rsa_between_experiments
if logger is None:
logger = logging.getLogger(__name__)
# Handle Embedding objects
if isinstance(data1, Embedding) or isinstance(data2, Embedding):
# Convert embeddings to MVData for uniform processing
if isinstance(data1, Embedding):
data1 = data1.to_mvdata()
if isinstance(data2, Embedding):
data2 = data2.to_mvdata()
logger.debug("Converted Embedding objects to MVData for RSA comparison")
# Check if both are Experiment objects
if isinstance(data1, Experiment) and isinstance(data2, Experiment):
if items is None:
raise ValueError("items must be specified when comparing Experiment objects")
logger.debug("Comparing two Experiment objects using RSA")
return rsa_between_experiments(
data1,
data2,
items=items,
data_type=data_type,
metric=metric,
comparison_method=comparison,
average_method="mean",
)
# Check if mixed types
elif isinstance(data1, Experiment) or isinstance(data2, Experiment):
raise ValueError(
"Cannot compare Experiment with non-Experiment data. Both inputs must be same type."
)
# Original behavior for arrays/MVData
else:
logger.debug("Computing RDMs for RSA comparison")
# Compute RDMs
rdm1 = compute_rdm(data1, metric=metric, logger=logger)
rdm2 = compute_rdm(data2, metric=metric, logger=logger)
# Check dimension compatibility
if rdm1.shape != rdm2.shape:
raise ValueError(
f"RDMs have incompatible shapes: {rdm1.shape} vs {rdm2.shape}. "
"Ensure both datasets have the same number of items/conditions."
)
# Compare RDMs
similarity = compare_rdms(rdm1, rdm2, method=comparison)
logger.debug(f"RSA comparison complete. Similarity: {similarity:.3f}")
return similarity