Source code for driada.intense.io

"""I/O utilities for INTENSE results."""
import json
import sys

import numpy as np


def _numpy_to_python(obj):
    """Convert numpy types to Python native types for JSON serialization.

    Recursively converts numpy scalars, arrays, and nested containers to
    JSON-serializable Python types.

    Parameters
    ----------
    obj : any
        Object to convert. Can be numpy scalar, array, dict, list, or native Python type.

    Returns
    -------
    any
        JSON-serializable Python object.
    """
    if obj is None:
        return None
    if isinstance(obj, np.integer):
        return int(obj)
    if isinstance(obj, np.floating):
        return float(obj)
    if isinstance(obj, np.bool_):
        return bool(obj)
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    if isinstance(obj, dict):
        return {str(k): _numpy_to_python(v) for k, v in obj.items()}
    if isinstance(obj, (list, tuple)):
        return [_numpy_to_python(v) for v in obj]
    return obj


[docs] class IntenseResults: """ Container for INTENSE computation results. Attributes ---------- info : dict Metadata about the computation (optimal delays, thresholds, etc.). intense_params : dict Parameters used for the INTENSE computation. stats : dict Statistical results (p-values, metric values, etc.). significance : dict Significance test results for each neuron-feature pair. Methods ------- update(property_name, data) Add or update a property with data. update_multiple(datadict) Update multiple properties from a dictionary. Examples -------- >>> # Create results container and add analysis outputs >>> results = IntenseResults() >>> # Add statistical results >>> results.update('stats', {'neuron1': {'feature1': {'me': 0.5, 'pval': 0.01}}}) >>> # Add computation metadata >>> results.update('info', {'optimal_delays': [[0, 5], [10, 0]], ... 'n_shuffles': 1000}) >>> # Access stored data >>> results.stats['neuron1']['feature1']['me'] 0.5 """
[docs] def __init__(self): """ Initialize an empty IntenseResults container. Creates an IntenseResults object with no initial data. Properties are added dynamically using the update() or update_multiple() methods. Notes ----- The IntenseResults class serves as a flexible container for storing INTENSE computation outputs. It allows dynamic addition of properties to accommodate different analysis configurations and results. Common properties added during INTENSE analysis: - 'stats': Statistical test results (p-values, metric values) - 'significance': Binary significance indicators - 'info': Computation metadata (delays, parameters used) - 'intense_params': Parameters used for the computation See Also -------- ~driada.intense.pipelines.compute_cell_feat_significance : Main function that returns IntenseResults """ pass
[docs] def update(self, property_name, data): """Add or update a property with data. Stores analysis results as attributes of the IntenseResults object, allowing flexible storage of various data types and structures. Parameters ---------- property_name : str Name of the property to store. Will become an attribute of the object accessible via dot notation. data : any Data to store. Can be any Python object: arrays, dictionaries, dataframes, custom objects, etc. Examples -------- >>> # Store different types of analysis results >>> import numpy as np >>> results = IntenseResults() >>> # Add mutual information matrix >>> results.update('mi_matrix', np.array([[0, 0.5], [0.5, 0]])) >>> # Add list of significant neuron-feature pairs >>> results.update('significant_pairs', [(0, 1), (2, 3)]) >>> # Access via attribute notation >>> results.mi_matrix array([[0. , 0.5], [0.5, 0. ]]) >>> results.significant_pairs [(0, 1), (2, 3)] Notes ----- Property names should be valid Python identifiers. Existing properties will be overwritten without warning. """ setattr(self, property_name, data)
[docs] def update_multiple(self, datadict): """Update multiple properties from a dictionary. Batch update of multiple properties at once, useful for storing related analysis results together. Parameters ---------- datadict : dict Dictionary mapping property names to data values. Each key-value pair will be stored as an attribute. Examples -------- >>> # Batch update multiple analysis results at once >>> import numpy as np >>> results = IntenseResults() >>> # Add multiple related results together >>> results.update_multiple({ ... 'mi_values': np.array([0.1, 0.5, 0.3]), ... 'p_values': np.array([0.05, 0.001, 0.02]), ... 'significant': np.array([False, True, True]), ... 'parameters': {'metric': 'mi', 'correction': 'fdr'} ... }) >>> # All properties are now accessible >>> results.mi_values array([0.1, 0.5, 0.3]) >>> results.significant array([False, True, True]) See Also -------- update : Add single property """ for dname, data in datadict.items(): setattr(self, dname, data)
[docs] def populate_stage1_pvals(self): """Reconstruct and populate pre_pval in stats from saved Stage 1 shuffle data. Fills in stats[neuron][feature]["pre_pval"] (currently None) with actual p-values computed from the saved me_total1 distributions. Modifies self.stats in-place. Returns ------- pre_pvals : np.ndarray, shape (n_neurons, n_features) The reconstructed p-value array. mi_values : np.ndarray, shape (n_neurons, n_features) The MI values array. Raises ------ AttributeError If required attributes (info, stats, intense_params) are missing. KeyError If me_total1 is not present in info. """ from .stats import reconstruct_stage1_pvals me_total1 = self.info["me_total1"] metric_distr_type = self.intense_params.get("metric_distr_type", "gamma") pre_pvals, mi_values = reconstruct_stage1_pvals( me_total1, metric_distr_type=metric_distr_type ) # Map indices to names and populate stats in-place # After JSON round-trip, dict keys become strings — resolve via values list neurons_map = self.intense_params["neurons"] features_map = self.intense_params["feat_bunch"] neuron_names = list(neurons_map.values()) if isinstance(neurons_map, dict) else neurons_map feature_names = list(features_map.values()) if isinstance(features_map, dict) else features_map n1, n2 = me_total1.shape[0], me_total1.shape[1] for i in range(n1): neuron_name = str(neuron_names[i]) if neuron_name not in self.stats: continue for j in range(n2): feature_name = str(feature_names[j]) if feature_name not in self.stats[neuron_name]: continue if not np.isnan(pre_pvals[i, j]): self.stats[neuron_name][feature_name]["pre_pval"] = float(pre_pvals[i, j]) return pre_pvals, mi_values
[docs] def validate_against_ground_truth(self, ground_truth, verbose=True): """ Compare INTENSE detections against known ground truth. Validates the analysis results against a ground truth dictionary, typically generated by generate_tuned_selectivity_exp(). Computes sensitivity, precision, F1 score, and per-type detection rates. Parameters ---------- ground_truth : dict Ground truth from generate_tuned_selectivity_exp(). Must contain: - "expected_pairs" : list of (neuron_idx, feature_name) tuples - "neuron_types" : dict mapping neuron_idx to group name (optional) verbose : bool, optional Print detailed results. Default: True. Returns ------- metrics : dict Validation metrics containing: - "true_positives" : int - Number of correctly detected pairs - "false_positives" : int - Number of spurious detections - "false_negatives" : int - Number of missed pairs - "sensitivity" : float - TP / (TP + FN) - "precision" : float - TP / (TP + FP) - "f1" : float - Harmonic mean of sensitivity and precision - "type_stats" : dict - Per-neuron-type statistics - "tp_pairs" : set - True positive (neuron, feature) pairs - "fp_pairs" : set - False positive pairs - "fn_pairs" : set - False negative pairs Notes ----- This method requires that the IntenseResults object has a 'significance' attribute populated with neuron-feature significance results. Examples -------- >>> # After running INTENSE analysis >>> results = IntenseResults() >>> # ... populate with analysis results ... >>> ground_truth = {"expected_pairs": [(0, "hd"), (1, "x")], ... "neuron_types": {0: "hd_cell", 1: "place_cell"}} >>> # metrics = results.validate_against_ground_truth(ground_truth) See Also -------- ~driada.experiment.synthetic.generators.generate_tuned_selectivity_exp : Generates experiments with ground truth """ if not hasattr(self, "significance"): raise ValueError( "IntenseResults has no 'significance' attribute. " "Run INTENSE analysis first." ) # Extract detected pairs from significance results # Check for both 'stage2' (two-stage mode) and 'criterion' (single-stage) detected_pairs = set() for neuron_id in self.significance: for feat_name in self.significance[neuron_id]: sig_entry = self.significance[neuron_id][feat_name] # Two-stage mode uses 'stage2', single-stage uses 'criterion' is_significant = sig_entry.get("stage2") or sig_entry.get("criterion") if is_significant: detected_pairs.add((neuron_id, feat_name)) expected_pairs = set(ground_truth["expected_pairs"]) # Calculate metrics true_positives = detected_pairs & expected_pairs false_positives = detected_pairs - expected_pairs false_negatives = expected_pairs - detected_pairs n_tp = len(true_positives) n_fp = len(false_positives) n_fn = len(false_negatives) sensitivity = n_tp / max(n_tp + n_fn, 1) precision = n_tp / max(n_tp + n_fp, 1) f1 = 2 * sensitivity * precision / max(sensitivity + precision, 1e-10) # Per-neuron-type breakdown type_stats = {} neuron_types = ground_truth.get("neuron_types", {}) if neuron_types: # Get unique neuron types (excluding nonselective) unique_types = set(neuron_types.values()) for neuron_type in unique_types: if "nonselective" in neuron_type.lower(): continue type_expected = [ p for p in expected_pairs if neuron_types.get(p[0]) == neuron_type ] type_detected = [ p for p in true_positives if neuron_types.get(p[0]) == neuron_type ] type_stats[neuron_type] = { "expected": len(type_expected), "detected": len(type_detected), "sensitivity": len(type_detected) / max(len(type_expected), 1), } metrics = { "true_positives": n_tp, "false_positives": n_fp, "false_negatives": n_fn, "sensitivity": sensitivity, "precision": precision, "f1": f1, "type_stats": type_stats, "tp_pairs": true_positives, "fp_pairs": false_positives, "fn_pairs": false_negatives, } if verbose: print("\n" + "=" * 60) print("GROUND TRUTH VALIDATION") print("=" * 60) print(f"\nOverall Performance:") print(f" True Positives: {n_tp}") print(f" False Positives: {n_fp}") print(f" False Negatives: {n_fn}") print(f" Sensitivity: {sensitivity:.1%}") print(f" Precision: {precision:.1%}") print(f" F1 Score: {f1:.1%}") if type_stats: print(f"\nPer-Type Detection Rates:") for neuron_type, stats in sorted(type_stats.items()): print( f" {neuron_type:15s}: {stats['detected']}/{stats['expected']} " f"({stats['sensitivity']:.0%})" ) if false_positives: print(f"\nFalse Positives (unexpected detections):") for neuron_id, feat in sorted(false_positives)[:5]: ntype = neuron_types.get(neuron_id, "unknown") print(f" Neuron {neuron_id} ({ntype}) -> {feat}") if len(false_positives) > 5: print(f" ... and {len(false_positives) - 5} more") if false_negatives: print(f"\nFalse Negatives (missed detections):") for neuron_id, feat in sorted(false_negatives)[:5]: ntype = neuron_types.get(neuron_id, "unknown") print(f" Neuron {neuron_id} ({ntype}) -> {feat}") if len(false_negatives) > 5: print(f" ... and {len(false_negatives) - 5} more") return metrics
[docs] def memory_usage(self): """ Return memory usage breakdown in bytes. Analyzes the memory consumption of all stored data in the IntenseResults object, providing a detailed breakdown by attribute. Useful for diagnosing memory issues and verifying that memory optimizations (like store_random_shifts=False) are working as expected. Returns ------- usage : dict Dictionary mapping attribute names to their memory usage in bytes. Keys include: - "info.{key}": Memory for each numpy array in the info dict - "info.{key}": Memory for DataFrames in info (sum of all columns) - "stats": Approximate memory for stats dict - "significance": Approximate memory for significance dict Notes ----- - For numpy arrays, uses the .nbytes attribute for accurate measurement - For pandas DataFrames, uses memory_usage(deep=True) for accurate measurement - For other objects, uses sys.getsizeof() which may underestimate nested structures - The "random_shifts1" and "random_shifts2" arrays are the largest consumers when store_random_shifts=True Examples -------- >>> from driada.intense import IntenseResults >>> import numpy as np >>> results = IntenseResults() >>> results.update('info', { ... 'me_total1': np.zeros((10, 5, 101)), ... 'me_total2': np.zeros((10, 5, 10001)) ... }) >>> usage = results.memory_usage() >>> 'info.me_total1' in usage True >>> usage['info.me_total1'] 40400 """ usage = {} if hasattr(self, 'info') and isinstance(self.info, dict): for key, val in self.info.items(): if isinstance(val, np.ndarray): usage[f"info.{key}"] = val.nbytes elif isinstance(val, list) and val: # Check if list of DataFrames if hasattr(val[0], 'memory_usage'): total = sum(df.memory_usage(deep=True).sum() for df in val) usage[f"info.{key}"] = int(total) else: usage[f"info.{key}"] = sys.getsizeof(val) else: usage[f"info.{key}"] = sys.getsizeof(val) if hasattr(self, 'stats'): usage["stats"] = sys.getsizeof(self.stats) if hasattr(self, 'significance'): usage["significance"] = sys.getsizeof(self.significance) if hasattr(self, 'intense_params'): usage["intense_params"] = sys.getsizeof(self.intense_params) return usage
[docs] def save_results(results, fname, compressed=False): """Save IntenseResults to NPZ file. Arrays stored as NPZ arrays, dicts embedded as JSON strings. Uncompressed is fastest; compressed saves ~10x disk space. Parameters ---------- results : IntenseResults Results object from INTENSE analysis. fname : str or Path Output file path (should end with .npz). compressed : bool, default False If True, use zlib compression (slower but 10x smaller files). Examples -------- >>> from driada.intense.io import IntenseResults, save_results, load_results >>> results = IntenseResults() >>> results.update('stats', {'cell1': {'feat1': {'me': 0.5}}}) >>> save_results(results, 'test.npz') >>> loaded = load_results('test.npz') >>> loaded.stats['cell1']['feat1']['me'] 0.5 """ arrays = {} # 1. Extract arrays from info info_scalars = {} if hasattr(results, 'info') and results.info: for key, val in results.info.items(): if isinstance(val, np.ndarray): # Use sparse float16 for large MI arrays (me_total1, me_total2) if key.startswith('me_total') and val.ndim == 3: # Store only non-NaN slices (neuron, feature pairs) val_f16 = val.astype(np.float16) # Check which (neuron, feature) slices have any non-NaN data valid_mask = ~np.all(np.isnan(val_f16), axis=2) valid_indices = np.argwhere(valid_mask).astype(np.int16) valid_data = val_f16[valid_mask] arrays[f'info_{key}_indices'] = valid_indices arrays[f'info_{key}_data'] = valid_data arrays[f'info_{key}_shape'] = np.array(val.shape, dtype=np.int32) else: arrays[f'info_{key}'] = val elif isinstance(val, list) and len(val) > 0 and hasattr(val[0], 'values'): # List of DataFrames - store values only (stage stats) for i, df in enumerate(val): arrays[f'info_{key}_{i}'] = df.values else: info_scalars[key] = val # 2. Extract disentanglement arrays disent_json = {} if hasattr(results, 'disentanglement') and results.disentanglement: disent = results.disentanglement for key in ['feat_feat_similarity', 'feat_feat_significance', 'disent_matrix', 'count_matrix']: if key in disent and disent[key] is not None: arrays[f'disent_{key}'] = disent[key] disent_json = { 'feature_names': disent.get('feature_names', []), 'summary': _numpy_to_python(disent.get('summary', {})), } # 3. Embed dicts as JSON strings in NPZ if hasattr(results, 'stats') and results.stats: arrays['_stats_json'] = np.array([json.dumps(_numpy_to_python(results.stats))]) if hasattr(results, 'significance') and results.significance: arrays['_significance_json'] = np.array([json.dumps(_numpy_to_python(results.significance))]) params_data = { 'intense_params': _numpy_to_python(getattr(results, 'intense_params', {})), 'info_scalars': _numpy_to_python(info_scalars), } arrays['_params_json'] = np.array([json.dumps(params_data)]) if disent_json: arrays['_disentanglement_json'] = np.array([json.dumps(disent_json)]) # 4. Save (compressed or uncompressed) if compressed: np.savez_compressed(fname, **arrays) else: np.savez(fname, **arrays)
[docs] def load_results(fname): """Load IntenseResults from NPZ file. Parameters ---------- fname : str or Path Path to the NPZ file. Returns ------- IntenseResults Reconstructed results object. Examples -------- >>> from driada.intense.io import IntenseResults, save_results, load_results >>> results = IntenseResults() >>> results.update('stats', {'cell1': {'feat1': {'me': 0.5}}}) >>> save_results(results, 'test.npz') >>> loaded = load_results('test.npz') >>> loaded.stats['cell1']['feat1']['me'] 0.5 """ data = np.load(fname, allow_pickle=True) results = IntenseResults() # 1. Load JSON-embedded dicts if '_stats_json' in data: results.update('stats', json.loads(str(data['_stats_json'][0]))) if '_significance_json' in data: results.update('significance', json.loads(str(data['_significance_json'][0]))) if '_params_json' in data: params = json.loads(str(data['_params_json'][0])) results.update('intense_params', params.get('intense_params', {})) info_scalars = params.get('info_scalars', {}) else: info_scalars = {} # 2. Reconstruct info dict with arrays info = dict(info_scalars) # First pass: identify sparse arrays (me_total with _indices, _data, _shape) sparse_keys = set() for key in data.files: if key.startswith('info_') and key.endswith('_shape'): base_key = key[5:-6] # Remove 'info_' prefix and '_shape' suffix sparse_keys.add(base_key) # Reconstruct sparse arrays for base_key in sparse_keys: indices = data[f'info_{base_key}_indices'] sparse_data = data[f'info_{base_key}_data'] shape = tuple(data[f'info_{base_key}_shape']) # Reconstruct dense array filled with NaN arr = np.full(shape, np.nan, dtype=sparse_data.dtype) for i, (ni, fi) in enumerate(indices): arr[ni, fi] = sparse_data[i] info[base_key] = arr # Second pass: load regular arrays for key in data.files: if key.startswith('info_') and not key.startswith('info_stage'): info_key = key[5:] # Remove 'info_' prefix # Skip sparse array components if any(info_key.startswith(sk) or info_key.endswith('_indices') or info_key.endswith('_data') or info_key.endswith('_shape') for sk in sparse_keys): continue arr = data[key] info[info_key] = arr if info: results.update('info', info) # 3. Reconstruct disentanglement if '_disentanglement_json' in data: disent = json.loads(str(data['_disentanglement_json'][0])) for key in ['feat_feat_similarity', 'feat_feat_significance', 'disent_matrix', 'count_matrix']: npz_key = f'disent_{key}' if npz_key in data: disent[key] = data[npz_key] results.update('disentanglement', disent) return results