Source code for PNMF.initialization

"""
Initialization strategies for PNMF.

Provides sklearn-compatible initialization methods for NMF:
- 'random': Random initialization with scaling
- 'nndsvd': Nonnegative Double SVD (sparse)
- 'nndsvda': NNDSVD with average fill (dense)
- 'nndsvdar': NNDSVD with random fill (dense, faster)
- 'k-means': K-means clustering based

Reference:
    C. Boutsidis, E. Gallopoulos (2008)
    "SVD based initialization: A head start for nonnegative matrix factorization"
    Pattern Recognition 41(4):1350-1362
    https://doi.org/10.1016/j.patcog.2007.09.010
"""

import numpy as np
import torch
from typing import Tuple, Optional, Union
from sklearn.decomposition import TruncatedSVD
from sklearn.cluster import KMeans
from scipy import sparse


def _norm(x: np.ndarray) -> float:
    """Dot product-based Euclidean norm."""
    return np.sqrt(np.dot(x.ravel(), x.ravel()))


def _scale_log_factors(
    exp_F: np.ndarray,
    target_std: float = 1.0,
    eps: float = 1e-8
) -> np.ndarray:
    """
    Scale exp_F so that log(exp_F) has approximately N(0, target_std^2) distribution.

    This ensures that the variational mean μ = log(exp_F) is on a similar scale
    across different datasets and initialization methods. A standard normal
    distribution has 97.5% of values between -1.96 and 1.96.

    Parameters
    ----------
    exp_F : ndarray of shape (n_samples, n_components)
        Expected latent factors in exp-space (before log transform).
    target_std : float, default=1.0
        Target standard deviation for log(exp_F). Setting to 1.0 means
        μ ~ N(0, 1) approximately.
    eps : float, default=1e-8
        Small value to add before log transform to handle zeros.

    Returns
    -------
    exp_F_scaled : ndarray of same shape as exp_F
        Scaled expected factors such that log(exp_F_scaled) has std ~ target_std.

    Notes
    -----
    The scaling is done by working in log-space directly:
    1. Computing log_F = log(exp_F + eps)
    2. Computing current_mean and current_std of log_F
    3. Standardizing: z = (log_F - current_mean) / current_std
    4. Scaling to target: log_F_scaled = z * target_std
    5. Exponentiating back: exp_F_scaled = exp(log_F_scaled)

    This ensures the log-space values have exactly the target standard deviation
    and approximately zero mean.
    """
    # Compute log-space values
    log_F = np.log(exp_F + eps)

    # Compute current statistics
    current_mean = log_F.mean()
    current_std = log_F.std()

    # If std is very small or zero, don't scale (already very concentrated)
    if current_std < 1e-6:
        return exp_F

    # Standardize and rescale to target standard deviation
    log_F_scaled = (log_F - current_mean) / current_std * target_std

    # Exponentiate back to exp-space
    return np.exp(log_F_scaled)


def _check_init_array(A: np.ndarray, shape: Tuple[int, int], name: str) -> np.ndarray:
    """Validate custom initialization array."""
    A = np.asarray(A)
    if A.shape != shape:
        raise ValueError(f"{name} has wrong shape: {A.shape}, expected {shape}")
    if np.min(A) < 0:
        raise ValueError(f"{name} must be non-negative")
    if np.max(A) == 0:
        raise ValueError(f"{name} cannot be all zeros")
    return A


def _initialize_random(
    X: np.ndarray,
    n_components: int,
    random_state: Optional[int] = None
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Random initialization following sklearn's approach.

    Parameters
    ----------
    X : ndarray of shape (n_samples, n_features)
        Input data matrix.
    n_components : int
        Number of components.
    random_state : int or None
        Random seed.

    Returns
    -------
    W : ndarray of shape (n_features, n_components)
        Loadings matrix initialization.
    exp_F : ndarray of shape (n_samples, n_components)
        Expected latent factors initialization (exp-space).
    """
    rng = np.random.RandomState(random_state)
    n_samples, n_features = X.shape

    # Scale: sqrt(X.mean() / n_components) as in sklearn
    avg = np.sqrt(X.mean() / n_components)

    # Initialize W (D x L) - corresponds to H.T in sklearn
    W = avg * np.abs(rng.standard_normal(size=(n_features, n_components)))

    # Initialize exp(F) (N x L) - corresponds to W in sklearn
    exp_F = avg * np.abs(rng.standard_normal(size=(n_samples, n_components)))

    return W, exp_F


def _initialize_nndsvd(
    X: np.ndarray,
    n_components: int,
    variant: str = 'nndsvd',
    random_state: Optional[int] = None
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Nonnegative Double SVD initialization.

    Implements the NNDSVD algorithm from Boutsidis & Gallopoulos (2008)
    using TruncatedSVD for efficiency with large sparse matrices.

    Parameters
    ----------
    X : ndarray of shape (n_samples, n_features)
        Non-negative input data matrix.
    n_components : int
        Number of components (must be <= min(n_samples, n_features)).
    variant : {'nndsvd', 'nndsvda', 'nndsvdar'}
        NNDSVD variant:
        - 'nndsvd': Basic NNDSVD (sparse, many zeros)
        - 'nndsvda': Fill zeros with average of X (dense)
        - 'nndsvdar': Fill zeros with small random values (dense, faster)
    random_state : int or None
        Random seed (used for 'nndsvdar').

    Returns
    -------
    W : ndarray of shape (n_features, n_components)
        Loadings matrix initialization.
    exp_F : ndarray of shape (n_samples, n_components)
        Expected latent factors initialization.

    References
    ----------
    C. Boutsidis, E. Gallopoulos (2008)
    "SVD based initialization: A head start for nonnegative matrix factorization"
    Pattern Recognition 41(4):1350-1362
    """
    if n_components > min(X.shape):
        raise ValueError(
            f"n_components ({n_components}) must be <= min(n_samples, n_features) "
            f"({min(X.shape)}) for NNDSVD initialization"
        )

    rng = np.random.RandomState(random_state)
    n_samples, n_features = X.shape
    eps = 1e-6

    # Convert to sparse matrix for efficiency if X is sparse
    # Use CSR format for efficient arithmetic and row slicing
    X_sparse = sparse.csr_matrix(X)

    # Step 1: Compute Truncated SVD (faster for large/sparse matrices)
    # Use 7 iterations by default (sklearn's default, works well in practice)
    # For very large sparse matrices, this is much faster than full SVD
    svd = TruncatedSVD(
        n_components=n_components,
        random_state=random_state,
        n_iter=7  # sklearn's default, good balance of speed and accuracy
    )
    U = svd.fit_transform(X_sparse)  # (n_samples, n_components)
    S = svd.singular_values_  # (n_components,)
    Vt = svd.components_      # (n_components, n_features)

    # Initialize output matrices
    # Note: In PNMF, we want W (D x L) and exp(F) (N x L)
    # Sklearn returns W_skl (N x L) and H_skl (L x D)
    # Mapping: exp(F) <- W_skl, W <- H_skl.T
    W = np.zeros((n_features, n_components), dtype=X.dtype)  # H.T in sklearn
    exp_F = np.zeros((n_samples, n_components), dtype=X.dtype)  # W in sklearn

    # Step 2: Leading singular triplet (j=0) is already non-negative
    W[:, 0] = np.sqrt(S[0]) * np.abs(Vt[0, :].T)   # First row of Vt
    exp_F[:, 0] = np.sqrt(S[0]) * np.abs(U[:, 0])   # First column of U

    # Step 3: Process remaining components (j=1 to n_components-1)
    for j in range(1, n_components):
        x = U[:, j]      # j-th left singular vector
        y = Vt[j, :]     # j-th right singular vector

        # Extract positive and negative parts
        x_p = np.maximum(x, 0)
        x_n = np.abs(np.minimum(x, 0))
        y_p = np.maximum(y, 0)
        y_n = np.abs(np.minimum(y, 0))

        # Compute norms
        x_p_nrm = _norm(x_p)
        x_n_nrm = _norm(x_n)
        y_p_nrm = _norm(y_p)
        y_n_nrm = _norm(y_n)

        # Compute magnitudes
        m_p = x_p_nrm * y_p_nrm
        m_n = x_n_nrm * y_n_nrm

        # Choose the better update
        if m_p > m_n:
            u = x_p / (x_p_nrm + 1e-12)
            v = y_p / (y_p_nrm + 1e-12)
            sigma = m_p
        else:
            u = x_n / (x_n_nrm + 1e-12)
            v = y_n / (y_n_nrm + 1e-12)
            sigma = m_n

        lbd = np.sqrt(S[j] * sigma)
        exp_F[:, j] = lbd * u    # W[:, j] in sklearn notation
        W[:, j] = lbd * v.T      # H[j, :].T in sklearn notation

    # Step 4: Truncate small values to zero
    W[W < eps] = 0
    exp_F[exp_F < eps] = 0

    # Step 5: Apply variant-specific zero-filling
    if variant == 'nndsvd':
        pass  # Keep zeros (sparse result)
    elif variant == 'nndsvda':
        # Fill zeros with average of X
        avg = X.mean()
        W[W == 0] = avg
        exp_F[exp_F == 0] = avg
    elif variant == 'nndsvdar':
        # Fill zeros with small random values
        avg = X.mean()
        W_zeros = W == 0
        exp_F_zeros = exp_F == 0

        # Small random values: avg * |N(0,1)| / 100
        W[W_zeros] = np.abs(avg * rng.standard_normal(size=W_zeros.sum()) / 100)
        exp_F[exp_F_zeros] = np.abs(avg * rng.standard_normal(size=exp_F_zeros.sum()) / 100)
    else:
        raise ValueError(f"Unknown NNDSVD variant: {variant}")

    return W, exp_F


def _initialize_kmeans(
    X: np.ndarray,
    n_components: int,
    random_state: Optional[int] = None,
    max_iter: int = 100
) -> Tuple[np.ndarray, np.ndarray]:
    """
    K-means based initialization.

    Uses cluster centroids and soft assignments to initialize W and F.

    Parameters
    ----------
    X : ndarray of shape (n_samples, n_features)
        Input data matrix.
    n_components : int
        Number of clusters/components.
    random_state : int or None
        Random seed for k-means.
    max_iter : int
        Maximum iterations for k-means.

    Returns
    -------
    W : ndarray of shape (n_features, n_components)
        Loadings matrix initialization (cluster centroids transposed).
    exp_F : ndarray of shape (n_samples, n_components)
        Expected latent factors (soft assignment weights).

    Notes
    -----
    This method:
    1. Runs k-means to find cluster centroids
    2. Uses centroids as W (after appropriate scaling)
    3. Uses soft assignments based on distance to centroids for exp(F)
    """
    n_samples, n_features = X.shape

    # Run k-means
    kmeans = KMeans(
        n_clusters=n_components,
        random_state=random_state,
        max_iter=max_iter,
        n_init=10
    )
    labels = kmeans.fit_predict(X)
    centroids = kmeans.cluster_centers_  # (n_components, n_features)

    # W: transpose of centroids (D x L)
    W = centroids.T  # (n_features, n_components)

    # exp_F: soft assignment weights (N x L)
    # Compute distances to all centroids
    from sklearn.metrics.pairwise import euclidean_distances
    distances = euclidean_distances(X, centroids)  # (n_samples, n_components)

    # Convert distances to weights (closer = higher weight)
    # Using exponential kernel: exp(-distance / mean_distance)
    mean_dist = distances.mean()
    exp_F = np.exp(-distances / (mean_dist + 1e-12))

    # Normalize rows
    row_sums = exp_F.sum(axis=1, keepdims=True)
    exp_F = exp_F / (row_sums + 1e-12)

    # Scale to match magnitude of X
    scale = X.mean() / exp_F.mean()
    exp_F = exp_F * scale
    W = W * scale

    return W, exp_F


[docs] def initialize_factors( X: Union[np.ndarray, torch.Tensor], n_components: int, init: Optional[str] = None, random_state: Optional[int] = None, scale_log_mu: bool = True, target_std: float = 1.0 ) -> Tuple[np.ndarray, np.ndarray]: """ Initialize W and exp(F) matrices for PNMF. This function provides sklearn-compatible initialization for PNMF. The returned matrices can be used to initialize the model parameters. Parameters ---------- X : ndarray or torch.Tensor of shape (n_samples, n_features) Non-negative input data matrix. n_components : int Number of latent components. init : {None, 'random', 'nndsvd', 'nndsvda', 'nndsvdar', 'k-means'}, default=None Initialization method: - None: Auto-select 'nndsvda' if n_components <= min(n_samples, n_features), otherwise 'random'. - 'random': Non-negative random matrices, scaled with sqrt(X.mean() / n_components). - 'nndsvd': Nonnegative Double SVD (better for sparseness). - 'nndsvda': NNDSVD with zeros filled with average of X (better for dense data). - 'nndsvdar': NNDSVD with zeros filled with small random values (faster dense). - 'k-means': K-means clustering based initialization. random_state : int or None Random seed for reproducibility. scale_log_mu : bool, default=True If True, scales exp_F so that log(exp_F) has approximately N(0, 1) distribution. This ensures the variational mean μ is on a consistent scale across different datasets and initialization methods. target_std : float, default=1.0 Target standard deviation for log(exp_F) when scale_log_mu=True. Returns ------- W : ndarray of shape (n_features, n_components) Loadings matrix initialization (for W parameter). exp_F : ndarray of shape (n_samples, n_components) Expected latent factors initialization (for exp(F) where F is in log-space). Examples -------- >>> import numpy as np >>> from PNMF.initialization import initialize_factors >>> X = np.random.poisson(5, size=(100, 50)).astype(np.float32) >>> W, exp_F = initialize_factors(X, n_components=5, init='nndsvda', random_state=42) >>> print(W.shape) # (50, 5) >>> print(exp_F.shape) # (100, 5) Notes ----- For PNMF, the model structure is: X ≈ W @ exp(F).T where: - W is (n_features, n_components) - loadings - F is the latent factor matrix (n_samples, n_components) in log-space - exp(F) is the expected value of the latent factors The variational distribution q(F) is Gaussian with mean μ and scale σ. To initialize from exp_F: μ = log(exp_F) (or log(exp_F + eps) if exp_F has zeros) σ can be initialized to a small value (e.g., 0.1) """ # Convert to numpy if needed if isinstance(X, torch.Tensor): X_np = X.detach().cpu().numpy() else: X_np = np.asarray(X) # Check for non-negative input if np.min(X_np) < 0: raise ValueError("Input matrix X must be non-negative for NMF initialization") n_samples, n_features = X_np.shape # Auto-select initialization if None if init is None: if n_components <= min(n_samples, n_features): init = 'nndsvd' else: init = 'random' # Dispatch to appropriate initialization if init == 'random': W, exp_F = _initialize_random(X_np, n_components, random_state) elif init in ('nndsvd', 'nndsvda', 'nndsvdar'): W, exp_F = _initialize_nndsvd(X_np, n_components, init, random_state) elif init == 'k-means': W, exp_F = _initialize_kmeans(X_np, n_components, random_state) else: raise ValueError( f"Unknown init parameter: '{init}'. " f"Valid options are: None, 'random', 'nndsvd', 'nndsvda', 'nndsvdar', 'k-means'" ) # Scale exp_F so that log(exp_F) ~ N(0, target_std^2) # This ensures μ is on a consistent scale across different datasets if scale_log_mu: exp_F_before = exp_F.copy() exp_F = _scale_log_factors(exp_F, target_std=target_std) # Compute the scaling factor applied to exp_F scale_factor = exp_F.mean() / (exp_F_before.mean() + 1e-12) # Adjust W inversely to maintain X ≈ W @ exp(F).T approximation # If exp_F is scaled by s, W should be scaled by 1/s W = W / scale_factor return W, exp_F