Source code for driada.dim_reduction.embedding

import numpy as np
from scipy.sparse.linalg import eigs
import umap.umap_ as umap
from pydiffmap import diffusion_map as dm
from scipy.sparse.csgraph import shortest_path

from sklearn.decomposition import PCA
from sklearn.manifold import spectral_embedding, Isomap, LocallyLinearEmbedding, TSNE
from sklearn.model_selection import train_test_split

# from sklearn.cluster.spectral import discretize

# warnings.filterwarnings("ignore")

from .dr_base import e_param_filter
from .graph import ProximityGraph

from .mvu import MaximumVarianceUnfolding

from ..network.matrix_utils import get_inv_sqrt_diag_matrix
from ..utils.data import check_positive, check_nonnegative


[docs] class Embedding: """Low-dimensional representation of high-dimensional data. This is an internal class typically created by MVData.get_embedding(). It provides a unified interface for various dimensionality reduction methods including linear (PCA), non-linear manifold learning (Isomap, LLE, UMAP), spectral methods (Laplacian Eigenmaps, Diffusion Maps), and neural network-based approaches (autoencoders, VAEs). Parameters ---------- init_data : ndarray Input data matrix of shape (n_features, n_samples). init_distmat : ndarray or None Precomputed distance matrix of shape (n_samples, n_samples). Used by methods like MDS when available. labels : array-like Labels for data points, used for visualization and evaluation. params : dict Filtered embedding parameters from e_param_filter(). Must contain: - 'e_method' : DRMethod object from METHODS_DICT - 'e_method_name' : str, method name (e.g., 'pca', 'umap') - 'dim' : int, target embedding dimension May also contain method-specific keys: - 'min_dist' : float, for UMAP only - 'dm_alpha' : float, for dmaps/auto_dmaps only - 'dm_t' : int, for dmaps/auto_dmaps only g : ProximityGraph, optional Precomputed proximity graph. Required for graph-based methods (LE, Isomap, LLE, etc.). If None, must be created before building. Attributes ---------- graph : ProximityGraph The proximity graph used by graph-based methods. coords : ndarray or None Embedding coordinates of shape (dim, n_samples). None until build() is called. labels : array-like Data point labels. nclasses : int Number of unique classes in labels. init_data : ndarray Original high-dimensional data. init_distmat : ndarray or None Precomputed distance matrix if provided. all_params : dict Filtered parameters from e_param_filter. transformation_matrix : ndarray or None For linear methods (PCA), the transformation matrix to project new data. nnmodel : nn.Module or None For neural network methods, the trained model. nn_loss : float or None For neural network methods, the final loss value. reducer\_ : object The underlying reducer object (sklearn model, etc.) for potential reuse. Plus all parameters from params dict set as attributes via setattr. Methods ------- build(kwargs=None) Build the embedding using the specified method. create_pca_embedding_(verbose=True) Linear projection via Principal Component Analysis. create_mds_embedding_() Classical multidimensional scaling preserving distances. create_isomap_embedding_() Non-linear embedding preserving geodesic distances. create_lle_embedding_() Locally Linear Embedding assuming local linearity. create_hlle_embedding_() Hessian LLE with better curvature preservation. create_mvu_embedding_() Maximum Variance Unfolding via semidefinite programming. create_le_embedding_() Laplacian Eigenmaps using graph spectral decomposition. create_auto_le_embedding_() Laplacian Eigenmaps using sklearn implementation. create_dmaps_embedding_() Diffusion Maps with manual implementation. create_auto_dmaps_embedding_() Diffusion Maps using pydiffmap library. create_tsne_embedding_() t-SNE for visualization with local structure preservation. create_umap_embedding_() UMAP balancing local and global structure. create_ae_embedding_(**kwargs) Autoencoder with optional correlation/MI losses (deprecated). create_vae_embedding_(**kwargs) Variational autoencoder (deprecated). create_flexible_ae_embedding_(**kwargs) Flexible autoencoder with modular loss composition. continue_learning(add_epochs, kwargs={}) Continue training neural network models. to_mvdata() Convert embedding to MVData for further processing. Notes ----- - Graph-based methods require a ProximityGraph to be provided or created - Neural methods (AE/VAE) require PyTorch to be installed - Some methods (MVU) require additional dependencies (cvxpy) - Coordinates are stored as (dim, n_samples) for consistency - The ``auto_`` prefix indicates methods using external libraries Examples -------- Direct instantiation (internal use): >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> import numpy as np >>> data = np.random.randn(100, 500) # 100 features, 500 samples >>> labels = np.random.randint(0, 3, 500) >>> # Parameters after e_param_filter >>> params = { ... 'e_method': METHODS_DICT['pca'], ... 'e_method_name': 'pca', ... 'dim': 2 ... } >>> embedding = Embedding(data, None, labels, params) >>> embedding.build(kwargs={'verbose': False}) >>> embedding.coords.shape (2, 500) Typical usage via MVData (recommended): >>> from driada.dim_reduction.data import MVData >>> mvdata = MVData(data, labels=labels) >>> # PCA embedding >>> embedding = mvdata.get_embedding(method='pca', dim=2, verbose=False) >>> embedding.coords.shape (2, 500) >>> # UMAP with custom parameters >>> embedding = mvdata.get_embedding( ... method='umap', ... dim=2, ... n_neighbors=15, ... min_dist=0.1 ... ) """
[docs] def __init__(self, init_data, init_distmat, labels, params, g=None): """Initialize Embedding object. Parameters ---------- init_data : array-like Initial data matrix with shape (n_features, n_samples) init_distmat : array-like or None Pre-computed distance matrix, optional labels : array-like Labels for each data point params : dict Dictionary of embedding parameters, must include 'e_method', 'e_method_name', and method-specific parameters g : ProximityGraph, optional Pre-computed proximity graph. If None, may be computed later depending on the embedding method Raises ------ TypeError If g is provided but not a ProximityGraph instance. AttributeError If required params keys are missing during attribute access. Notes ----- All keys in params dict are set as instance attributes via setattr. The params are filtered through e_param_filter before use. Examples -------- >>> # Typically created via MVData.get_embedding(), not directly >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> data = np.random.randn(100, 50) # 100 features, 50 samples >>> labels = np.repeat([0, 1], 25) >>> params = {'e_method': METHODS_DICT['pca'], 'e_method_name': 'pca', 'dim': 2} >>> emb = Embedding(data, None, labels, params) """ if g is not None: if isinstance(g, ProximityGraph): self.graph = g else: raise TypeError("Graph must be ProximityGraph instance") self.all_params = e_param_filter(params) for key in params: setattr(self, key, params[key]) self.init_data = init_data self.init_distmat = init_distmat if self.e_method.is_linear: self.transformation_matrix = None self.labels = labels self.coords = None try: self.nclasses = len(set(self.labels)) except (TypeError, AttributeError): # Fixed: was returning array instead of count self.nclasses = len(np.unique(self.labels)) if self.e_method.nn_based: self.nnmodel = None
[docs] def build(self, kwargs=None): """Build the embedding using the specified method. Dynamically calls the appropriate embedding creation method based on the embedding method name (e.g., 'pca' calls ``create_pca_embedding_``). Parameters ---------- kwargs : dict, optional Additional keyword arguments passed to the specific embedding method. For neural network methods (AE/VAE), this includes training parameters. Returns ------- None Modifies self.coords in-place with embedding coordinates. Raises ------ AttributeError If e_method_name is invalid or corresponding method not found. If self.graph is None for methods requiring it. Exception If the graph is disconnected and the method cannot handle it. Examples -------- >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> data = np.random.randn(20, 100) # 20 features, 100 samples >>> labels = np.zeros(100) >>> params = {'e_method': METHODS_DICT['pca'], 'e_method_name': 'pca', 'dim': 2} >>> emb = Embedding(data, None, labels, params) >>> emb.build(kwargs={'verbose': False}) # For PCA, no graph needed >>> emb.coords.shape (2, 100) """ if kwargs is None: kwargs = dict() fn = getattr(self, "create_" + self.e_method_name + "_embedding_") if self.e_method.requires_graph: if not self.graph.is_connected() and not self.e_method.handles_disconnected_graphs: raise Exception("Graph is not connected!") fn(**kwargs)
[docs] def create_pca_embedding_(self, verbose=True): """Create PCA (Principal Component Analysis) embedding. Linear dimensionality reduction using orthogonal transformation to convert data into linearly uncorrelated components ordered by variance. Parameters ---------- verbose : bool, default=True Whether to print progress messages. Returns ------- None Sets self.coords to shape (dim, n_samples). Raises ------ AttributeError If self.dim is not set. ValueError If PCA fails (e.g., dim > n_features). Notes ----- Sets self.coords to shape (dim, n_samples) and stores the PCA object in ``self.reducer_`` for potential reuse or analysis. Data is transposed before fitting since init_data is (n_features, n_samples) while sklearn expects (n_samples, n_features). Examples -------- >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> data = np.random.randn(10, 500) # 10 features, 500 samples >>> labels = np.random.randint(0, 3, 500) >>> params = {'e_method': METHODS_DICT['pca'], 'e_method_name': 'pca', 'dim': 2} >>> emb = Embedding(data, None, labels, params) >>> emb.create_pca_embedding_(verbose=False) >>> emb.coords.shape (2, 500) """ if verbose: print("Calculating PCA embedding...") pca = PCA(n_components=self.dim) self.coords = pca.fit_transform(self.init_data.T).T self.reducer_ = pca
# print(pca.explained_variance_ratio_)
[docs] def create_isomap_embedding_(self): """Create Isomap embedding using geodesic distances. Non-linear dimensionality reduction through isometric mapping. Preserves geodesic distances between all points by first computing shortest paths on the neighborhood graph, then applying MDS. Returns ------- None Sets self.coords to shape (dim, n_samples). Raises ------ AttributeError If self.graph, self.dim, or self.nn not set. MemoryError If converting sparse matrix to dense fails. Notes ----- Requires a proximity graph. Uses Dijkstra's algorithm to compute shortest paths, then applies classical MDS to the geodesic distance matrix. Warning: Converts sparse adjacency to dense matrix which may use excessive memory for large datasets. The Isomap object is stored in ``self.reducer_`` for potential reuse. Examples -------- >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> from driada.dim_reduction.graph import ProximityGraph >>> data = np.random.randn(10, 500) # 10 features, 500 samples >>> labels = np.random.randint(0, 3, 500) >>> params = {'e_method': METHODS_DICT['isomap'], 'e_method_name': 'isomap', 'dim': 2} >>> emb = Embedding(data, None, labels, params) >>> m_params = {'metric_name': 'euclidean'} >>> g_params = {'g_method_name': 'knn', 'nn': 10, 'weighted': False} >>> emb.graph = ProximityGraph(data, m_params, g_params) >>> emb.create_isomap_embedding_() >>> emb.coords.shape[0] # Number of dimensions 2 """ # Validate graph exists if not hasattr(self, "graph") or self.graph is None: raise AttributeError("Graph not built. Call build_graph() first.") # Validate parameters check_positive(dim=self.dim, nn=self.graph.nn) A = self.graph.adj isomap_reducer = Isomap( n_components=self.dim, n_neighbors=self.graph.nn, metric="precomputed" ) # self.coords = sp.csr_matrix(map.fit_transform(self.graph.data.A.T).T) spmatrix = shortest_path(A.todense(), method="D", directed=False) self.coords = isomap_reducer.fit_transform(spmatrix).T self.reducer_ = isomap_reducer
[docs] def create_mds_embedding_(self): """Create MDS (Multi-Dimensional Scaling) embedding. Classical MDS finds a low-dimensional representation that preserves pairwise distances between points. Works with either a pre-computed distance matrix or calculates distances from the data. Returns ------- None Sets self.coords to shape (dim, n_samples). Raises ------ ImportError If sklearn.manifold.MDS not available. AttributeError If self.dim not set. Notes ----- Sets self.coords to shape (dim, n_samples) containing the MDS coordinates. If init_distmat is provided, uses it as a precomputed distance matrix. Otherwise, computes Euclidean distances from init_data. The algorithm minimizes the stress function: stress = sum((d_ij - ||x_i - x_j||)^2) where d_ij are the input distances. Uses fixed random_state=42 for reproducibility - to be changed in the future. The MDS object is stored in ``self.reducer_`` for potential reuse. Examples -------- >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> from scipy.spatial.distance import pdist, squareform >>> data = np.random.randn(10, 500) # 10 features, 500 samples >>> labels = np.random.randint(0, 3, 500) >>> distmat = squareform(pdist(data.T)) >>> params = {'e_method': METHODS_DICT['mds'], 'e_method_name': 'mds', 'dim': 2} >>> emb = Embedding(data, distmat, labels, params) >>> emb.create_mds_embedding_() >>> emb.coords.shape (2, 500) """ from sklearn.manifold import MDS # MDS typically uses a distance matrix if hasattr(self, "init_distmat") and self.init_distmat is not None: # Use provided distance matrix mds = MDS(n_components=self.dim, dissimilarity="precomputed", random_state=42) self.coords = mds.fit_transform(self.init_distmat).T else: # Compute from data mds = MDS(n_components=self.dim, random_state=42) self.coords = mds.fit_transform(self.init_data.T).T self.reducer_ = mds
[docs] def create_mvu_embedding_(self): """Create Maximum Variance Unfolding (MVU) embedding. Non-linear dimensionality reduction that "unfolds" a manifold by maximizing variance while preserving local distances. Solves a semidefinite programming problem to find the optimal embedding. Sets self.coords to shape (dim, n_samples) embedding coordinates and ``self.reducer_`` to the fitted MVU object. Raises ------ ImportError If cvxpy is not installed. AttributeError If self.graph or required attributes are not set. ValueError If self.dim is not positive or if solver fails to converge. Notes ----- Requires cvxpy for convex optimization. Uses the SCS solver with reasonable defaults for most datasets. The embedding preserves local neighborhood structure while maximizing global variance. The optimization problem maximizes tr(Y'Y) subject to: - Local distance preservation: ||y_i - y_j||² = ||x_i - x_j||² for neighbors - Centering: ∑y_i = 0 Examples -------- >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> from driada.dim_reduction.graph import ProximityGraph >>> data = np.random.randn(10, 500) # 10 features, 500 samples >>> labels = np.random.randint(0, 3, 500) >>> params = {'e_method': METHODS_DICT['mvu'], 'e_method_name': 'mvu', 'dim': 2} >>> emb = Embedding(data, None, labels, params) >>> m_params = {'metric_name': 'euclidean'} >>> g_params = {'g_method_name': 'knn', 'nn': 10, 'weighted': False} >>> emb.graph = ProximityGraph(data, m_params, g_params) >>> emb.create_mvu_embedding_() >>> emb.coords.shape (2, 500) """ # Validate required attributes if not hasattr(self, "graph") or self.graph is None: raise AttributeError( "Graph must be created before MVU embedding. Call create_graph() first." ) if not hasattr(self, "dim"): raise AttributeError("Embedding dimension 'dim' not set.") check_positive(dim=self.dim) try: import cvxpy as cp except ImportError: raise ImportError( "cvxpy is required for MVU but not installed. " "Install it with: pip install cvxpy or conda install -c conda-forge cvxpy" ) mvu = MaximumVarianceUnfolding( equation="berkley", solver=cp.SCS, solver_tol=1e-2, eig_tol=1.0e-10, solver_iters=2500, warm_start=False, seed=None, ) self.coords = mvu.fit_transform(self.graph.data.T, self.dim, self.graph.nn).T self.reducer_ = mvu
[docs] def create_lle_embedding_(self): """Create Locally Linear Embedding (LLE). Non-linear dimensionality reduction that assumes data lies on a locally linear manifold. Each point is reconstructed from its neighbors, and these weights are preserved in lower dimensions. Sets self.coords to shape (dim, n_samples) embedding coordinates and ``self.reducer_`` to the fitted sklearn LLE object. Raises ------ AttributeError If self.graph or required attributes are not set. ValueError If self.dim is not positive or n_neighbors is invalid. Notes ----- Uses sklearn's LocallyLinearEmbedding implementation. The algorithm: 1. Finds k nearest neighbors for each point 2. Computes weights to reconstruct each point from neighbors 3. Finds low-d embedding preserving reconstruction weights Time complexity is O(DNlog(k)N + DNk³) for N points in D dimensions. Examples -------- >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> from driada.dim_reduction.graph import ProximityGraph >>> data = np.random.randn(10, 500) # 10 features, 500 samples >>> labels = np.random.randint(0, 3, 500) >>> params = {'e_method': METHODS_DICT['lle'], 'e_method_name': 'lle', 'dim': 2} >>> emb = Embedding(data, None, labels, params) >>> m_params = {'metric_name': 'euclidean'} >>> g_params = {'g_method_name': 'knn', 'nn': 10, 'weighted': False} >>> emb.graph = ProximityGraph(data, m_params, g_params) >>> emb.create_lle_embedding_() >>> emb.coords.shape (2, 500) References ---------- Roweis & Saul (2000). Nonlinear dimensionality reduction by locally linear embedding. Science 290:2323-2326.""" # Validate required attributes if not hasattr(self, "graph") or self.graph is None: raise AttributeError( "Graph must be created before LLE embedding. Call create_graph() first." ) if not hasattr(self, "dim"): raise AttributeError("Embedding dimension 'dim' not set.") check_positive(dim=self.dim) check_positive(n_neighbors=self.graph.nn) lle = LocallyLinearEmbedding(n_components=self.dim, n_neighbors=self.graph.nn) self.coords = lle.fit_transform(self.graph.data.T).T self.reducer_ = lle
[docs] def create_hlle_embedding_(self): """Create Hessian Locally Linear Embedding (HLLE). Modified version of LLE that uses Hessian-based regularization to better preserve local geometric structure. Particularly effective for manifolds with varying curvature. Sets self.coords to shape (dim, n_samples) embedding coordinates and ``self.reducer_`` to the fitted sklearn HLLE object. Raises ------ AttributeError If self.graph or required attributes are not set. ValueError If self.dim is not positive or if the constraint n_neighbors > n_components * (n_components + 3) / 2 is not satisfied. Notes ----- Requires n_neighbors > n_components * (n_components + 3) / 2 for the Hessian estimation. More computationally intensive than standard LLE but often produces better embeddings for complex manifolds. The algorithm estimates the Hessian at each point to capture local curvature, then finds an embedding that preserves this structure. Examples -------- >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> from driada.dim_reduction.graph import ProximityGraph >>> data = np.random.randn(10, 500) # 10 features, 500 samples >>> labels = np.random.randint(0, 3, 500) >>> params = {'e_method': METHODS_DICT['hlle'], 'e_method_name': 'hlle', 'dim': 2} >>> emb = Embedding(data, None, labels, params) >>> # For 2D embedding, need at least 6 neighbors >>> m_params = {'metric_name': 'euclidean'} >>> g_params = {'g_method_name': 'knn', 'nn': 10, 'weighted': False} >>> emb.graph = ProximityGraph(data, m_params, g_params) >>> emb.create_hlle_embedding_() # doctest: +SKIP References ---------- Donoho, D. & Grimes, C. (2003). Hessian eigenmaps: Locally linear embedding techniques for high-dimensional data. PNAS.""" # Validate required attributes if not hasattr(self, "graph") or self.graph is None: raise AttributeError( "Graph must be created before HLLE embedding. Call create_graph() first." ) if not hasattr(self, "dim"): raise AttributeError("Embedding dimension 'dim' not set.") check_positive(dim=self.dim) check_positive(n_neighbors=self.graph.nn) # Check HLLE-specific constraint min_neighbors = int(self.dim * (self.dim + 3) / 2) + 1 if self.graph.nn <= min_neighbors: raise ValueError( f"HLLE requires n_neighbors > {min_neighbors} for {self.dim}D embedding, " f"but got n_neighbors={self.graph.nn}. Increase nn parameter." ) hlle = LocallyLinearEmbedding( n_components=self.dim, n_neighbors=self.graph.nn, method="hessian" ) self.coords = hlle.fit_transform(self.graph.data.T).T self.reducer_ = hlle
[docs] def create_le_embedding_(self): """Create Laplacian Eigenmaps (LE) embedding. Spectral embedding method that uses eigenvectors of the graph Laplacian to embed nodes while preserving local neighborhood structure. Particularly effective for data lying on low-dimensional manifolds. Sets self.coords to shape (dim, n_samples) containing the embedding. Raises ------ AttributeError If self.graph or required attributes are not set. ValueError If self.dim is not positive or if the graph is disconnected (multiple eigenvalues equal to 1). Notes ----- Uses the transition matrix eigenvectors (more stable than Laplacian). Normalizes eigenvectors by node degree to ensure proper embedding. The embedding minimizes: sum_ij W_ij ||y_i - y_j||^2 subject to orthogonality constraints. The algorithm: 1. Computes transition matrix P = D^(-1)A 2. Finds top eigenvectors of P (excluding trivial) 3. Normalizes by degree for proper embedding Examples -------- >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> from driada.dim_reduction.graph import ProximityGraph >>> data = np.random.randn(10, 500) # 10 features, 500 samples >>> labels = np.random.randint(0, 3, 500) >>> params = {'e_method': METHODS_DICT['le'], 'e_method_name': 'le', 'dim': 2} >>> emb = Embedding(data, None, labels, params) >>> m_params = {'metric_name': 'euclidean'} >>> g_params = {'g_method_name': 'knn', 'nn': 10, 'weighted': False} >>> emb.graph = ProximityGraph(data, m_params, g_params) >>> emb.create_le_embedding_() >>> emb.coords.shape[0] # Number of dimensions 2 References ---------- Belkin, M. & Niyogi, P. (2003). Laplacian eigenmaps for dimensionality reduction and data representation. Neural Computation.""" # Validate required attributes if not hasattr(self, "graph") or self.graph is None: raise AttributeError( "Graph must be created before LE embedding. Call create_graph() first." ) if not hasattr(self, "dim"): raise AttributeError("Embedding dimension 'dim' not set.") check_positive(dim=self.dim) A = self.graph.adj dim = self.dim n = self.graph.n DH = get_inv_sqrt_diag_matrix(A) P = self.graph.get_matrix("trans") start_v = np.ones(n) # LR mode is much more stable, this is why we use P matrix largest eigenvalues eigvals, eigvecs = eigs(P, k=dim + 1, which="LR", v0=start_v, maxiter=n * 1000) # eigvals, vecs = eigs(nL, k = dim2 + 1, which = 'SM') eigvals = np.asarray([np.round(np.real(x), 6) for x in eigvals]) # Check for disconnected graph (multiple eigenvalues close to 1) if np.sum(np.abs(eigvals - 1.0) < 1e-10) > 1: raise ValueError( "Graph appears to be disconnected (multiple eigenvalues ≈ 1). " "Laplacian Eigenmaps requires a connected graph. " "Consider increasing nn parameter or using a different method." ) else: vecs = np.real(eigvecs.T[1:]) vec_norms = np.array([np.real(sum([x * x for x in v])) for v in vecs]) vecs = vecs / vec_norms[:, np.newaxis] vecs = vecs.dot(DH.toarray()) self.coords = vecs
[docs] def create_auto_le_embedding_(self): """Create Laplacian Eigenmaps embedding using sklearn's implementation. Alternative implementation using sklearn's spectral_embedding function. More robust and handles edge cases better than the manual implementation. Sets self.coords to shape (dim, n_samples). Raises ------ AttributeError If self.graph or required attributes are not set. ValueError If self.dim is not positive. MemoryError If the graph is too large to convert to dense format. Notes ----- Uses normalized Laplacian by default for better numerical stability. Automatically handles disconnected graphs by dropping the first eigenvector. Warning: Converts sparse adjacency matrix to dense format, which may cause memory issues for graphs with more than ~10,000 nodes. This is the recommended method for most use cases unless you need fine control over the eigendecomposition process. Examples -------- >>> import numpy as np >>> from driada.dim_reduction.dr_base import METHODS_DICT >>> from driada.dim_reduction.graph import ProximityGraph >>> data = np.random.randn(10, 500) # 10 features, 500 samples >>> labels = np.random.randint(0, 3, 500) >>> params = {'e_method': METHODS_DICT['auto_le'], 'e_method_name': 'auto_le', 'dim': 2} >>> emb = Embedding(data, None, labels, params) >>> m_params = {'metric_name': 'euclidean'} >>> g_params = {'g_method_name': 'knn', 'nn': 10, 'weighted': False} >>> emb.graph = ProximityGraph(data, m_params, g_params) >>> emb.create_auto_le_embedding_() >>> emb.coords.shape[0] # Number of dimensions 2 """ # Validate required attributes if not hasattr(self, "graph") or self.graph is None: raise AttributeError( "Graph must be created before auto LE embedding. Call create_graph() first." ) if not hasattr(self, "dim"): raise AttributeError("Embedding dimension 'dim' not set.") check_positive(dim=self.dim) A = self.graph.adj dim = self.dim # Warn about memory usage for large graphs if A.shape[0] > 10000: import warnings warnings.warn( f"Converting sparse matrix with {A.shape[0]} nodes to dense format. " "This may use significant memory. Consider using create_le_embedding_() instead.", MemoryWarning, ) A = A.asfptype() # Convert to numpy array instead of matrix to avoid sklearn compatibility issues vecs = spectral_embedding( np.asarray(A.todense()), n_components=dim, eigen_solver=None, random_state=None, eigen_tol=0.0, norm_laplacian=True, drop_first=True, ).T self.coords = vecs
[docs] def create_dmaps_embedding_(self): """Create diffusion maps embedding. Implements the standard diffusion maps algorithm with alpha normalization for anisotropic diffusion and diffusion time parameter t. Raises ------ AttributeError If graph is not built or required attributes are missing. ValueError If dim is invalid, graph has isolated nodes, or eigendecomposition fails. Notes ----- The algorithm performs the following steps: 1. Apply alpha normalization to adjacency matrix 2. Create Markov transition matrix 3. Compute eigendecomposition 4. Scale eigenvectors by eigenvalues^t Future enhancement: Variable bandwidth diffusion maps - Berry & Harlim (2016): "Variable bandwidth diffusion kernels" - DOI: https://doi.org/10.1016/j.acha.2015.01.001 - Would allow adaptive kernel bandwidth based on local density References ---------- - Coifman & Lafon (2006): Diffusion maps - DOI: https://doi.org/10.1016/j.acha.2006.04.006""" import numpy as np from scipy.sparse import csr_matrix, diags from scipy.sparse.linalg import eigsh # Validate graph exists if not hasattr(self, "graph") or self.graph is None: raise AttributeError("Graph not built. Call build_graph() first.") # Get and validate parameters alpha = self.dm_alpha if hasattr(self, "dm_alpha") else 0.5 t = self.dm_t if hasattr(self, "dm_t") else 1 # Diffusion time parameter check_positive(dim=self.dim, t=t) check_nonnegative(alpha=alpha) # Check dimension validity n_samples = self.graph.adj.shape[0] if self.dim >= n_samples: raise ValueError(f"dim ({self.dim}) must be less than n_samples ({n_samples})") # Get affinity matrix from graph W = self.graph.adj.astype(float) # Ensure the matrix is symmetric W = (W + W.T) / 2 # Apply alpha normalization (anisotropic diffusion) # First compute the degree matrix D = np.asarray(W.sum(axis=1)).flatten() # Check for isolated nodes if np.any(D == 0): raise ValueError("Graph contains isolated nodes with zero degree") D_alpha = D**alpha # Normalize by D^alpha from both sides D_alpha_inv = 1.0 / D_alpha W_alpha = diags(D_alpha_inv) @ W @ diags(D_alpha_inv) # Compute new degree matrix for normalized kernel D_alpha_norm = np.asarray(W_alpha.sum(axis=1)).flatten() # Check for numerical issues if np.any(D_alpha_norm == 0): raise ValueError("Alpha normalization resulted in zero row sums") D_alpha_norm_inv = 1.0 / D_alpha_norm # Create Markov transition matrix P = diags(D_alpha_norm_inv) @ W_alpha # Compute eigendecomposition # We need the largest eigenvalues (close to 1) try: # For sparse matrices, use eigsh if hasattr(P, "toarray"): eigenvalues, eigenvectors = eigsh(P, k=self.dim + 1, which="LM", tol=1e-6) else: # For dense matrices, convert to sparse first P_sparse = csr_matrix(P) eigenvalues, eigenvectors = eigsh(P_sparse, k=self.dim + 1, which="LM", tol=1e-6) except Exception: # Fallback to dense computation P_dense = P.toarray() if hasattr(P, "toarray") else P eigenvalues_all, eigenvectors_all = np.linalg.eig(P_dense) # Sort by magnitude idx = np.abs(eigenvalues_all).argsort()[::-1] eigenvalues = eigenvalues_all[idx[: self.dim + 1]] eigenvectors = eigenvectors_all[:, idx[: self.dim + 1]] # Sort eigenvalues/vectors in descending order idx = eigenvalues.argsort()[::-1] eigenvalues = eigenvalues[idx] eigenvectors = eigenvectors[:, idx] # Remove the first eigenvector (constant, eigenvalue=1) eigenvalues = eigenvalues[1 : self.dim + 1] eigenvectors = eigenvectors[:, 1 : self.dim + 1] # Apply diffusion time scaling: lambda^t eigenvalues_t = eigenvalues**t # Scale eigenvectors by eigenvalues^t # For very small t or when eigenvalues are close to 1, this preserves more variance self.coords = (eigenvectors * eigenvalues_t).T # Store additional info self.reducer_ = { "eigenvalues": eigenvalues, "eigenvectors": eigenvectors, "alpha": alpha, "t": t, }
[docs] def create_auto_dmaps_embedding_(self): """Create diffusion maps embedding using pydiffmap library. Alternative implementation using the pydiffmap library which provides automatic bandwidth selection via the Berry-Harlim-Gao (BGH) method. More sophisticated than the manual implementation. Raises ------ AttributeError If graph is not built. ValueError If parameters are invalid or pydiffmap fails. ImportError If pydiffmap is not installed. Notes ----- Sets self.coords to shape (dim, n_samples). Uses epsilon='bgh' for automatic bandwidth selection based on local geometry. The alpha parameter controls the degree of density normalization (alpha=0: no normalization, alpha=1: full Fokker-Planck normalization). This method is preferred when you want automatic parameter tuning and don't need fine control over the diffusion process.""" # Validate graph exists if not hasattr(self, "graph") or self.graph is None: raise AttributeError("Graph not built. Call build_graph() first.") # Validate parameters check_positive(dim=self.dim, nn=self.graph.nn) alpha = self.dm_alpha if hasattr(self, "dm_alpha") else 1 check_nonnegative(alpha=alpha) dim = self.dim nn = self.graph.nn metric = self.graph.metric metric_args = self.graph.metric_args try: from pydiffmap import diffusion_map as dm except ImportError: raise ImportError("pydiffmap not installed. Install with: pip install pydiffmap") try: mydmap = dm.DiffusionMap.from_sklearn( n_evecs=dim, k=nn, epsilon="bgh", metric=metric, metric_params=metric_args, alpha=alpha, ) dmap = mydmap.fit_transform(self.init_data.T) self.coords = dmap.T self.reducer_ = dmap except Exception as e: raise ValueError(f"Diffusion maps computation failed: {str(e)}")
[docs] def create_tsne_embedding_(self): """Create t-SNE (t-distributed Stochastic Neighbor Embedding). Non-linear dimensionality reduction that converts similarities between data points to joint probabilities and minimizes KL divergence between high-dimensional and low-dimensional distributions. Raises ------ ValueError If dim is invalid or t-SNE computation fails. ImportError If scikit-learn is not installed. Notes ----- Sets self.coords to shape (dim, n_samples). Particularly effective for visualization (dim=2 or 3). Non-parametric: cannot embed new points without refitting. Stochastic: different runs may produce different results. The perplexity parameter (related to number of neighbors) controls the balance between local and global structure preservation. References ---------- van der Maaten, L. & Hinton, G. (2008). Visualizing data using t-SNE. Journal of Machine Learning Research.""" # Validate parameters check_positive(dim=self.dim) try: from sklearn.manifold import TSNE except ImportError: raise ImportError("scikit-learn not installed") # Use self.verbose if available, otherwise silent verbose = getattr(self, "verbose", 0) try: perplexity = getattr(self, "perplexity", 30) random_state = getattr(self, "random_state", None) model = TSNE( n_components=self.dim, perplexity=perplexity, random_state=random_state, verbose=verbose, ) self.coords = model.fit_transform(self.init_data.T).T self.reducer_ = model except Exception as e: raise ValueError(f"t-SNE computation failed: {str(e)}")
[docs] def create_umap_embedding_(self): """Create UMAP (Uniform Manifold Approximation and Projection) embedding. State-of-the-art manifold learning technique based on Riemannian geometry and algebraic topology. Preserves both local and global structure better than t-SNE. Notes ----- Sets self.coords to shape (dim, n_samples). The min_dist parameter controls how tightly points are packed (smaller values = tighter packing). Unlike t-SNE, UMAP can transform new points after fitting. Advantages over t-SNE: - Preserves more global structure - Faster for large datasets - Supports supervised/semi-supervised modes - Can embed new points References ---------- McInnes, L., Healy, J., & Melville, J. (2018). UMAP: Uniform Manifold Approximation and Projection for Dimension Reduction. arXiv:1802.03426.""" # Validate graph and parameters if not hasattr(self, "graph") or self.graph is None: raise AttributeError("Graph not built. Call build_graph() first.") if not hasattr(self, "min_dist"): raise AttributeError("min_dist attribute not set") check_positive(dim=self.dim, nn=self.graph.nn) check_nonnegative(min_dist=self.min_dist) try: import umap except ImportError: raise ImportError("umap-learn not installed. Install with: pip install umap-learn") try: random_state = getattr(self, "random_state", None) reducer = umap.UMAP( n_neighbors=self.graph.nn, n_components=self.dim, min_dist=self.min_dist, random_state=random_state, ) # Use init_data, not graph.data self.coords = reducer.fit_transform(self.init_data.T).T self.reducer_ = reducer except Exception as e: raise ValueError(f"UMAP computation failed: {str(e)}")
def _prepare_data_loaders(self, batch_size, train_size, seed): """Prepare train and test data loaders for neural network methods. Parameters ---------- batch_size : int Batch size for data loaders. Must be positive. train_size : float Proportion of data to use for training. Must be in range (0, 1). seed : int Random seed for reproducible splits. Must be non-negative. Returns ------- train_loader : DataLoader Training data loader test_loader : DataLoader Test data loader device : torch.device Device to use for training (cuda if available, else cpu) Raises ------ ValueError If parameters are invalid. ImportError If PyTorch is not installed. Notes ----- Uses sklearn's train_test_split with shuffle=False to preserve temporal order in the data split.""" # Validate parameters check_positive(batch_size=batch_size) check_nonnegative(seed=seed) if not 0 < train_size < 1: raise ValueError(f"train_size must be in range (0, 1), got {train_size}") try: import torch from torch.utils.data import DataLoader from sklearn.model_selection import train_test_split from .neural import NeuroDataset except (ImportError, OSError) as e: if "torch" in str(e) or isinstance(e, OSError): raise ImportError( "PyTorch is required for autoencoder methods. " "Please install it with: pip install torch" ) else: raise ImportError("scikit-learn is required for data splitting") torch.manual_seed(seed) torch.backends.cudnn.benchmark = False torch.backends.cudnn.deterministic = True # Split data into train and test sets data_T = self.init_data.T # Transpose to (n_samples, n_features) indices = np.arange(data_T.shape[0]) # Use sklearn's train_test_split without shuffling to preserve temporal order train_indices, test_indices = train_test_split( indices, train_size=train_size, random_state=seed, shuffle=False ) # Create datasets with the split indices train_dataset = NeuroDataset(self.init_data[:, train_indices]) test_dataset = NeuroDataset(self.init_data[:, test_indices]) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True) # Use gpu if available device = torch.device("cuda" if torch.cuda.is_available() else "cpu") return train_loader, test_loader, device
[docs] def create_ae_embedding_( self, continue_learning=0, epochs=50, lr=1e-3, seed=42, batch_size=32, enc_kwargs=None, dec_kwargs=None, feature_dropout=0.2, train_size=0.8, inter_dim=100, verbose=True, add_corr_loss=False, corr_hyperweight=0, add_mi_loss=False, mi_hyperweight=0, minimize_mi_data=None, log_every=1, device=None, ): """Create autoencoder embedding. .. deprecated:: This method is deprecated. Use ``create_flexible_ae_embedding_`` instead for more flexibility and advanced loss functions. Parameters ---------- continue_learning : int, default=0 Whether to continue training existing model. epochs : int, default=50 Number of training epochs. lr : float, default=1e-3 Learning rate. seed : int, default=42 Random seed. batch_size : int, default=32 Batch size for training. enc_kwargs : dict, optional Encoder configuration. dec_kwargs : dict, optional Decoder configuration. feature_dropout : float, default=0.2 Feature dropout rate. train_size : float, default=0.8 Training set fraction. inter_dim : int, default=100 Hidden layer dimension. verbose : bool, default=True Print training progress. add_corr_loss : bool, default=False Add correlation loss to encourage decorrelated latent features. corr_hyperweight : float, default=0 Weight for correlation loss. add_mi_loss : bool, default=False Add MI-based orthogonality loss. mi_hyperweight : float, default=0 Weight for MI loss. minimize_mi_data : np.ndarray, optional External data to minimize correlation with (for MI loss). log_every : int, default=1 Logging frequency. device : torch.device, optional Device to run on. Raises ------ ValueError If parameters are invalid. ImportError If PyTorch is not installed. Notes ----- This method is deprecated in favor of ``create_flexible_ae_embedding_`` which provides more flexibility and advanced loss functions.""" # --------------------------------------------------------------------------- # Validate parameters check_positive(epochs=epochs, lr=lr, inter_dim=inter_dim, log_every=log_every) check_nonnegative(corr_hyperweight=corr_hyperweight, mi_hyperweight=mi_hyperweight) # Import torch dependencies (optional dependency) try: import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from .neural import AE except (ImportError, OSError): raise ImportError( "PyTorch is required for autoencoder methods. " "Please install it with: pip install torch" ) # Get data loaders and device train_loader, test_loader, device_to_use = self._prepare_data_loaders( batch_size=batch_size, train_size=train_size, seed=seed ) if device is None: device = device_to_use # Set random seed again for model initialization torch.manual_seed(seed) if verbose: print("device:", device) if not continue_learning: # create a model from `AE` autoencoder class # load it to the specified device, either gpu or cpu # Ensure kwargs are dictionaries, not None if enc_kwargs is None: enc_kwargs = {} if dec_kwargs is None: dec_kwargs = {} model = AE( orig_dim=self.init_data.shape[0], inter_dim=inter_dim, code_dim=self.dim, enc_kwargs=enc_kwargs, dec_kwargs=dec_kwargs, device=device, ) model = model.to(device) else: model = self.nnmodel # create an optimizer object optimizer = optim.Adam(model.parameters(), lr=lr) if continue_learning and hasattr(self, "_optimizer_state"): optimizer.load_state_dict(self._optimizer_state) for param_group in optimizer.param_groups: param_group["lr"] = lr # mean-squared error loss criterion = nn.MSELoss() def correlation_loss(data): """Compute correlation loss to encourage decorrelated features. Parameters ---------- data : torch.Tensor Latent representations of shape (n_features, n_samples). Returns ------- torch.Tensor Average pairwise correlation magnitude as scalar loss. Notes ----- Computes average absolute correlation between all pairs of features, excluding self-correlations (diagonal).""" corr = torch.corrcoef(data) nv = corr.shape[0] closs = torch.abs( (torch.sum(torch.abs(corr)) - 1 * nv) / (nv**2 - nv) ) # average pairwise correlation amplitude return closs def data_orthogonality_loss(data, ortdata): """Compute orthogonality loss between embeddings and external data. Encourages embeddings to be uncorrelated with external data by penalizing correlation coefficients between all variable pairs. Parameters ---------- data : torch.Tensor Embedding latent representations of shape (n_features, n_samples). ortdata : torch.Tensor External data to minimize correlation with, shape (n_features2, n_samples). Returns ------- torch.Tensor Average absolute correlation between data and ortdata as scalar loss. Notes ----- This is a workaround for mutual information estimation. The loss computes average magnitude of correlations between all pairs of variables from data and ortdata.""" n1, n2 = data.shape[0], ortdata.shape[1] fulldata = torch.cat((data, ortdata), dim=0) corr = torch.corrcoef(fulldata) nvar = n1 * n2 closs = torch.abs( (torch.sum(torch.abs(corr))) / nvar ) # average pairwise correlation amplitude return closs # --------------------------------------------------------------------------- f_dropout = nn.Dropout(feature_dropout) best_test_epoch = -1 if continue_learning: best_test_loss = self.nn_loss best_test_model = model.state_dict().copy() best_optimizer_state = optimizer.state_dict() else: best_test_loss = 1e10 best_test_model = None best_optimizer_state = None for epoch in range(epochs): loss = 0 for batch_features, indices in train_loader: batch_features = batch_features.to(device) # reset the gradients back to zero # PyTorch accumulates gradients on subsequent backward passes optimizer.zero_grad() # compute reconstructions noisy_batch_features = ( f_dropout(torch.ones(batch_features.shape).to(device)) * batch_features ) outputs = model(noisy_batch_features.float()) code = model.encoder(noisy_batch_features.float()).T """ # ==================== MINE experiment ======================== from torch_mist.estimators import mine from torch_mist.utils.train import train_mi_estimator from torch_mist.utils import evaluate_mi minimize_mi_data = torch.tensor(minimize_mi_data[:, _]).float().to(device) print(minimize_mi_data.shape, code.shape) estimator = mine( x_dim=minimize_mi_data.shape[0], y_dim=code.shape[0], hidden_dims=[64, 32], ) # Train it on the given samples train_log = train_mi_estimator( estimator=estimator, train_data=(minimize_mi_data.T, code.T), batch_size=batch_size, max_iterations=1000, device=device, fast_train=True, max_epochs=10 ) # Evaluate the estimator on the entirety of the data estimated_mi = evaluate_mi( estimator=estimator, data=(minimize_mi_data.T.to(device), code.T.to(device)), batch_size=batch_size ) print(f"Mutual information estimated value: {estimated_mi} nats") # ==================== MINE experiment ======================== """ # compute training reconstruction loss train_loss = criterion(outputs, batch_features.float()) if add_mi_loss: ortdata = torch.tensor(minimize_mi_data[:, indices]).float().to(device) train_loss += mi_hyperweight * data_orthogonality_loss(code, ortdata) if add_corr_loss: train_loss += corr_hyperweight * correlation_loss(code) # compute accumulated gradients train_loss.backward() # perform parameter update based on current gradients optimizer.step() # add the mini-batch training loss to epoch loss loss += train_loss.item() # compute the epoch training loss loss = loss / len(train_loader) # display the epoch training loss if (epoch + 1) % log_every == 0: # compute loss on test part tloss = 0 for batch_features, indices in test_loader: batch_features = batch_features.to(device) # compute reconstructions noisy_batch_features = ( f_dropout(torch.ones(batch_features.shape).to(device)) * batch_features ) outputs = model(noisy_batch_features.float()) # compute test reconstruction loss test_loss = criterion(outputs, batch_features.float()) if add_mi_loss: ortdata = torch.tensor(minimize_mi_data[:, indices]).float().to(device) train_loss += mi_hyperweight * data_orthogonality_loss(code, ortdata) if add_corr_loss: code = model.encoder(noisy_batch_features.float()).T train_loss += corr_hyperweight * correlation_loss(code) tloss += test_loss.item() # compute the epoch test loss tloss = tloss / len(test_loader) if tloss < best_test_loss: best_test_loss = tloss best_test_epoch = epoch + 1 best_test_model = model.state_dict().copy() best_optimizer_state = optimizer.state_dict() if verbose: print( f"epoch : {epoch + 1}/{epochs}, train loss = {loss:.8f}, test loss = {tloss:.8f}" ) if verbose: if best_test_epoch != epochs + 1: print(f"best model: epoch {best_test_epoch}") if best_test_model is not None: model.load_state_dict(best_test_model) self.nnmodel = model self._optimizer_state = best_optimizer_state if best_optimizer_state is not None else optimizer.state_dict() input_ = torch.tensor(self.init_data.T).float().to(device) self.coords = model.get_code_embedding(input_) self.nn_loss = best_test_loss
# -------------------------------------
[docs] def create_vae_embedding_( self, continue_learning=0, epochs=50, lr=1e-3, seed=42, batch_size=32, enc_kwargs=None, dec_kwargs=None, feature_dropout=0.2, kld_weight=1, train_size=0.8, inter_dim=128, verbose=True, log_every=10, **kwargs, ): """Create variational autoencoder embedding. .. deprecated:: This method is deprecated. Use ``create_flexible_ae_embedding_`` instead for more flexibility and advanced loss functions. Parameters ---------- continue_learning : int, default=0 Whether to continue training existing model. epochs : int, default=50 Number of training epochs. lr : float, default=1e-3 Learning rate. seed : int, default=42 Random seed. batch_size : int, default=32 Batch size for training. enc_kwargs : dict, optional Encoder configuration. dec_kwargs : dict, optional Decoder configuration. feature_dropout : float, default=0.2 Feature dropout rate. kld_weight : float, default=1 Weight for KL divergence loss term. train_size : float, default=0.8 Training set fraction. inter_dim : int, default=128 Hidden layer dimension. verbose : bool, default=True Print training progress. log_every : int, default=10 Logging frequency. **kwargs Additional keyword arguments. Raises ------ ValueError If parameters are invalid. ImportError If PyTorch is not installed. Notes ----- This method is deprecated in favor of ``create_flexible_ae_embedding_`` which provides more flexibility and advanced loss functions.""" # --------------------------------------------------------------------------- # Validate parameters check_positive(epochs=epochs, lr=lr, inter_dim=inter_dim, log_every=log_every) check_nonnegative(kld_weight=kld_weight) # Import torch dependencies (optional dependency) try: import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from .neural import VAE except (ImportError, OSError): raise ImportError( "PyTorch is required for autoencoder methods. " "Please install it with: pip install torch" ) # Get data loaders and device train_loader, test_loader, device = self._prepare_data_loaders( batch_size=batch_size, train_size=train_size, seed=seed ) # Set random seed again for model initialization torch.manual_seed(seed) if not continue_learning: # create a model from `VAE` autoencoder class # load it to the specified device, either gpu or cpu model = VAE( orig_dim=len(self.init_data), inter_dim=inter_dim, code_dim=self.dim, enc_kwargs=enc_kwargs, dec_kwargs=dec_kwargs, device=device, ) model = model.to(device) else: model = self.nnmodel # create an optimizer object # Adam optimizer with learning rate lr optimizer = optim.Adam(model.parameters(), lr=lr) if continue_learning and hasattr(self, "_optimizer_state"): optimizer.load_state_dict(self._optimizer_state) for param_group in optimizer.param_groups: param_group["lr"] = lr # BCE error loss # criterion = nn.BCELoss(reduction='sum') criterion = nn.MSELoss() # --------------------------------------------------------------------------- f_dropout = nn.Dropout(feature_dropout) best_test_epoch = -1 if continue_learning: best_test_loss = self.nn_loss if hasattr(self, 'nn_loss') else 1e10 best_test_model = model.state_dict().copy() best_optimizer_state = optimizer.state_dict() else: best_test_loss = 1e10 best_test_model = None best_optimizer_state = None for epoch in range(epochs): loss = 0 loss1 = 0 loss2 = 0 for batch_features, indices in train_loader: # NeuroDataset returns (sample, idx) batch_features = batch_features.to(device) # reset the gradients back to zero # PyTorch accumulates gradients on subsequent backward passes optimizer.zero_grad() # compute reconstructions data = f_dropout(torch.ones(batch_features.shape).to(device)) * batch_features data = data.to(device).float() # Ensure float32 reconstruction, mu, logvar = model(data) # compute training reconstruction loss mse_loss = criterion(reconstruction, data) kld_loss = -0.5 * torch.sum( 1 + logvar - mu.pow(2) - logvar.exp() ) # * train_dataset.__len__()/batch_size train_loss = mse_loss + kld_weight * kld_loss # compute accumulated gradients train_loss.backward() # perform parameter update based on current gradients optimizer.step() # add the mini-batch training loss to epoch loss loss += train_loss.item() loss1 += mse_loss.item() loss2 += kld_loss.item() # compute the epoch training loss loss = loss / len(train_loader) loss1 = loss1 / len(train_loader) loss2 = loss2 / len(train_loader) # display the epoch training loss # display the epoch training loss if (epoch + 1) % log_every == 0: # compute loss on test part tloss = 0 for batch_features, indices in test_loader: # NeuroDataset returns (sample, idx) data = f_dropout(torch.ones(batch_features.shape).to(device)) * batch_features data = data.to(device).float() # Ensure float32 reconstruction, mu, logvar = model(data) # compute training reconstruction loss mse_loss = criterion(reconstruction, data) kld_loss = -0.5 * torch.sum( 1 + logvar - mu.pow(2) - logvar.exp() ) # * train_dataset.__len__()/batch_size test_loss = mse_loss + kld_weight * kld_loss tloss += test_loss.item() # compute the epoch test loss tloss = tloss / len(test_loader) # Track best model if tloss < best_test_loss: best_test_loss = tloss best_test_epoch = epoch + 1 best_test_model = model.state_dict().copy() best_optimizer_state = optimizer.state_dict() if verbose: print( f"epoch : {epoch + 1}/{epochs}, train loss = {loss:.8f}, test loss = {tloss:.8f}" ) if verbose: if best_test_epoch != epochs: print(f"best model: epoch {best_test_epoch}") # Load best model weights if best_test_model is not None: model.load_state_dict(best_test_model) self.nnmodel = model self._optimizer_state = best_optimizer_state if best_optimizer_state is not None else optimizer.state_dict() input_ = torch.tensor(self.init_data.T).float().to(device) self.coords = model.get_code_embedding(input_) self.nn_loss = best_test_loss
# -------------------------------------
[docs] def create_flexible_ae_embedding_( self, architecture="ae", # "ae" or "vae" continue_learning=0, epochs=50, lr=1e-3, seed=42, batch_size=32, enc_kwargs=None, dec_kwargs=None, feature_dropout=0.2, train_size=0.8, inter_dim=100, verbose=True, loss_components=None, log_every=1, device=None, logger=None, labels=None, ): """Create flexible autoencoder embedding with modular loss composition. Parameters ---------- architecture : str, default="ae" Architecture type: "ae" for standard autoencoder, "vae" for variational. continue_learning : int, default=0 Whether to continue training existing model. epochs : int, default=50 Number of training epochs. lr : float, default=1e-3 Learning rate. seed : int, default=42 Random seed for reproducibility. batch_size : int, default=32 Batch size for training. enc_kwargs : dict, optional Encoder configuration (e.g., dropout). dec_kwargs : dict, optional Decoder configuration. feature_dropout : float, default=0.2 Dropout rate for input features during training. train_size : float, default=0.8 Fraction of data for training. inter_dim : int, default=100 Hidden layer dimension. verbose : bool, default=True Whether to print training progress. loss_components : list of dict, optional Loss component configurations. Each dict should contain: - "name": str, the loss type - "weight": float, the loss weight - Additional parameters specific to each loss type If None, uses standard reconstruction loss for AE or reconstruction + KLD for VAE. log_every : int, default=1 Log frequency (epochs). device : torch.device, optional Device to run on. logger : logging.Logger, optional Logger instance. labels : array-like, optional Integer class labels, shape (n_samples,). Passed to ClassificationLoss during training. If None, classification losses contribute zero. Examples -------- >>> import numpy as np >>> from driada.dim_reduction.data import MVData >>> data = np.random.randn(50, 500) # 50 features, 500 samples >>> mvdata = MVData(data) >>> >>> # Standard autoencoder with correlation loss >>> emb = mvdata.get_embedding( ... method="flexible_ae", ... architecture="ae", ... dim=10, ... loss_components=[ ... {"name": "reconstruction", "weight": 1.0}, ... {"name": "correlation", "weight": 0.1} ... ], ... epochs=5, # Quick test ... verbose=False ... ) >>> # β-VAE for disentanglement >>> emb = mvdata.get_embedding( ... method="flexible_ae", ... architecture="vae", ... dim=10, ... loss_components=[ ... {"name": "reconstruction", "weight": 1.0}, ... {"name": "beta_vae", "weight": 1.0, "beta": 4.0} ... ], ... epochs=5, # Quick test ... verbose=False ... ) >>> # Recreate deprecated 'ae' method with correlation loss >>> # Old: method="ae", add_corr_loss=True, corr_hyperweight=0.1 >>> # New: >>> emb = mvdata.get_embedding( ... method="flexible_ae", ... architecture="ae", ... dim=10, ... loss_components=[ ... {"name": "reconstruction", "weight": 1.0}, ... {"name": "correlation", "weight": 0.1} ... ], ... epochs=5, # Quick test ... verbose=False ... ) >>> # Recreate deprecated 'ae' method with MI loss >>> # Old: method="ae", add_mi_loss=True, mi_hyperweight=0.1, minimize_mi_data=data >>> # New: >>> emb = mvdata.get_embedding( ... method="flexible_ae", ... architecture="ae", ... dim=10, ... loss_components=[ ... {"name": "reconstruction", "weight": 1.0}, ... {"name": "orthogonality", "weight": 0.1, "external_data": data} ... ], ... epochs=5, # Quick test ... verbose=False ... ) >>> # Recreate deprecated 'vae' method >>> # Old: method="vae", kld_weight=0.1 >>> # New: >>> emb = mvdata.get_embedding( ... method="flexible_ae", ... architecture="vae", ... dim=10, ... loss_components=[ ... {"name": "reconstruction", "weight": 1.0}, ... {"name": "beta_vae", "weight": 1.0, "beta": 0.1} ... ], ... epochs=5, # Quick test ... verbose=False ... ) Raises ------ ValueError If parameters are invalid or architecture not in ["ae", "vae"]. ImportError If PyTorch is not installed. Notes ----- This method provides a flexible framework for various autoencoder architectures with modular loss composition. It replaces the deprecated ``create_ae_embedding_`` and ``create_vae_embedding_`` methods.""" # Validate parameters check_positive(epochs=epochs, lr=lr, inter_dim=inter_dim, log_every=log_every) if architecture not in ["ae", "vae"]: raise ValueError(f"architecture must be 'ae' or 'vae', got '{architecture}'") # Import torch dependencies try: import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from .flexible_ae import ModularAutoencoder, FlexibleVAE except (ImportError, OSError): raise ImportError( "PyTorch is required for autoencoder methods. " "Please install it with: pip install torch" ) # Get data loaders and device train_loader, test_loader, device_to_use = self._prepare_data_loaders( batch_size=batch_size, train_size=train_size, seed=seed ) # Prepare labels if provided (for classification loss) labels_tensor = None if labels is not None: labels_tensor = torch.tensor(labels, dtype=torch.long) if device is None: device = device_to_use # Set random seed for model initialization torch.manual_seed(seed) if verbose: print("device:", device) # Default loss components if none specified if loss_components is None: loss_components = [] # Always add reconstruction loss loss_components.append({"name": "reconstruction", "weight": 1.0, "loss_type": "mse"}) # For VAE, add standard KLD loss if architecture == "vae": loss_components.append({"name": "beta_vae", "weight": 1.0, "beta": 1.0}) if not continue_learning: # Create model based on architecture if architecture == "ae": model = ModularAutoencoder( input_dim=self.init_data.shape[0], latent_dim=self.dim, hidden_dim=inter_dim, encoder_config=enc_kwargs, decoder_config=dec_kwargs, loss_components=loss_components, device=device, logger=logger, ) elif architecture == "vae": model = FlexibleVAE( input_dim=self.init_data.shape[0], latent_dim=self.dim, hidden_dim=inter_dim, encoder_config=enc_kwargs, decoder_config=dec_kwargs, loss_components=loss_components, device=device, logger=logger, ) else: raise ValueError(f"Unknown architecture: {architecture}") model = model.to(device) else: model = self.nnmodel # Loss component nn.Module attrs are registered on the model, # so model.parameters() includes them automatically optimizer = optim.Adam(model.parameters(), lr=lr) if continue_learning and hasattr(self, "_optimizer_state"): optimizer.load_state_dict(self._optimizer_state) # Apply the (potentially new) learning rate for param_group in optimizer.param_groups: param_group["lr"] = lr # Feature dropout f_dropout = nn.Dropout(feature_dropout) best_test_epoch = -1 if continue_learning: best_test_loss = self.nn_loss best_test_model = model.state_dict().copy() best_optimizer_state = optimizer.state_dict() else: best_test_loss = 1e10 best_test_model = None best_optimizer_state = None # Training loop for epoch in range(epochs): # Training phase model.train() train_losses = [] for batch_features, indices in train_loader: batch_features = batch_features.to(device) optimizer.zero_grad() # Apply feature dropout noisy_features = f_dropout(torch.ones_like(batch_features)) * batch_features noisy_features = noisy_features.float() # Compute loss batch_labels = labels_tensor[indices] if labels_tensor is not None else None total_loss, loss_dict = model.compute_loss( noisy_features, indices=indices, labels=batch_labels ) # Backward pass total_loss.backward() optimizer.step() train_losses.append(loss_dict) # Average training losses avg_train_losses = {} for key in train_losses[0].keys(): avg_train_losses[key] = np.mean([d[key] for d in train_losses]) # Validation phase (no feature dropout — evaluate on clean data) if (epoch + 1) % log_every == 0: model.eval() test_losses = [] with torch.no_grad(): for batch_features, indices in test_loader: batch_features = batch_features.to(device).float() # Compute loss on clean data batch_labels = labels_tensor[indices] if labels_tensor is not None else None total_loss, loss_dict = model.compute_loss( batch_features, indices=indices, labels=batch_labels ) test_losses.append(loss_dict) # Average test losses avg_test_losses = {} for key in test_losses[0].keys(): avg_test_losses[key] = np.mean([d[key] for d in test_losses]) test_loss = avg_test_losses["total_loss"] # Track best model if test_loss < best_test_loss: best_test_loss = test_loss best_test_epoch = epoch + 1 best_test_model = model.state_dict().copy() best_optimizer_state = optimizer.state_dict() if verbose: train_loss = avg_train_losses["total_loss"] print( f"epoch : {epoch + 1}/{epochs}, " f"train loss = {train_loss:.8f}, " f"test loss = {test_loss:.8f}" ) # Print individual loss components if len(model.losses) > 1: loss_str = ", ".join( [ f"{k}: {v:.6f}" for k, v in avg_test_losses.items() if k != "total_loss" and not k.endswith("_weighted") ] ) print(f" Components: {loss_str}") if verbose and best_test_epoch != epochs: print(f"best model: epoch {best_test_epoch}") # Load best model if best_test_model is not None: model.load_state_dict(best_test_model) # Store model, optimizer state (matched to best model), and extract embeddings self.nnmodel = model self._optimizer_state = best_optimizer_state if best_optimizer_state is not None else optimizer.state_dict() input_ = torch.tensor(self.init_data.T).float().to(device) self.coords = model.get_latent_representation(input_) self.nn_loss = best_test_loss
[docs] def continue_learning(self, add_epochs, **kwargs): """Continue training an existing autoencoder model. Allows resuming training of a previously trained autoencoder or VAE for additional epochs with potentially different parameters. Parameters ---------- add_epochs : int Number of additional epochs to train. Must be positive. **kwargs Additional keyword arguments to pass to the training method. These override the original training parameters (e.g., lr for fine-tuning with a lower learning rate). Raises ------ ValueError If add_epochs is not positive or method is not DL-based. AttributeError If no model has been trained yet. Notes ----- This method requires that an autoencoder model was previously trained using one of the DL-based methods (ae, vae, flexible_ae). Training parameters from the original call (feature_dropout, batch_size, etc.) are preserved automatically. Only parameters explicitly passed in kwargs are overridden.""" # Validate parameters check_positive(add_epochs=add_epochs) # Check if this is a DL-based method if self.all_params["e_method_name"] not in ["ae", "vae", "flexible_ae"]: raise ValueError( "continue_learning only works with DL-based methods (ae, vae, flexible_ae)" ) # Check if model exists if not hasattr(self, "nnmodel") or self.nnmodel is None: raise AttributeError("No model to continue training. Train a model first.") # Restore original training parameters so they don't reset to defaults. # User kwargs override these. _training_params = [ "feature_dropout", "batch_size", "train_size", "seed", "inter_dim", "architecture", "loss_components", "log_every", "enc_kwargs", "dec_kwargs", ] merged = {} for p in _training_params: if hasattr(self, p): merged[p] = getattr(self, p) merged.update(kwargs) # Get the appropriate training method fn = getattr(self, "create_" + self.all_params["e_method_name"] + "_embedding_") # Continue training with additional epochs fn(continue_learning=1, epochs=add_epochs, **merged)
[docs] def to_mvdata(self): """Convert embedding coordinates to MVData for further processing. This allows embeddings to be used as input for additional dimensionality reduction or analysis steps, enabling recursive embedding pipelines. Label Handling -------------- Graph-based dimensionality reduction methods (LLE, Laplacian Eigenmaps, Isomap, etc.) may remove disconnected nodes during preprocessing, resulting in fewer points in the embedding than in the original data. This method handles labels in the following way: 1. If all points are preserved: Labels are passed through unchanged. 2. If nodes were filtered and a node mapping exists: Labels are filtered to match only the kept nodes, preserving the correspondence. 3. If nodes were filtered but no mapping is available: Labels are set to None to avoid misalignment between data points and labels. Returns ------- MVData An MVData object containing the embedding coordinates as data. Labels will be: - Original labels if all points preserved - Filtered labels matching kept nodes if mapping available - None if nodes were removed but mapping unavailable Raises ------ ValueError If embedding has not been built yet. Examples -------- >>> import numpy as np >>> from driada.dim_reduction.data import MVData >>> np.random.seed(42) >>> high_dim_data = np.random.randn(20, 500) # 20 features, 500 samples >>> labels = np.random.choice(['A', 'B', 'C', 'D'], size=500) >>> mvdata = MVData(high_dim_data, labels=labels) >>> # PCA preserves all points >>> pca_emb = mvdata.get_embedding(method='pca', dim=2, verbose=False) >>> pca_mvdata = pca_emb.to_mvdata() >>> assert len(pca_mvdata.labels) == 500 # All labels preserved >>> # LLE might remove disconnected nodes >>> lle_emb = mvdata.get_embedding(method='lle', dim=2, nn=2) >>> lle_mvdata = lle_emb.to_mvdata() >>> # Labels either filtered to match remaining nodes or None >>> assert lle_mvdata.labels is None or len(lle_mvdata.labels) == lle_mvdata.n_points """ # Import here to avoid circular dependency from .data import MVData if self.coords is None: raise ValueError("Embedding has not been built yet. Call build() first.") # Handle case where graph preprocessing removed nodes labels_to_use = self.labels # Check if the number of points in coords differs from labels if self.labels is not None and self.coords.shape[1] != len(self.labels): # If we have a graph with node mapping, use it if hasattr(self, "graph") and self.graph is not None: if ( hasattr(self.graph, "_init_to_final_node_mapping") and self.graph._init_to_final_node_mapping ): # Filter labels to match the nodes that remain after preprocessing node_mapping = self.graph._init_to_final_node_mapping # Get the original indices that were kept kept_indices = sorted(node_mapping.keys()) labels_to_use = self.labels[kept_indices] else: # No explicit mapping, but coords has fewer points # This can happen with graph methods that remove disconnected components # In this case, we can't recover which labels to keep, so set to None labels_to_use = None else: # No graph, but mismatch in sizes - set labels to None labels_to_use = None # Create MVData with embedding coordinates # coords shape is (embedding_dim, n_points), which matches MVData format return MVData( data=self.coords, labels=labels_to_use, distmat=None, # Distance matrix would need to be recomputed for embedding space rescale_rows=False, # Embeddings are already scaled appropriately data_name=f"{self.e_method_name}_embedding", )