Source code for qdiv.stats.distance_tests

"""
Distance tests: Mantel, Permanova, Gower

Public API:
    - mantel
    - permanova
    - gower
"""

import pandas as pd
import numpy as np
from numpy.linalg import matrix_rank, pinv
import warnings
from typing import Literal, List, Union, Dict, Any, Sequence
from ..utils import get_df
from pandas.api.types import (
    is_numeric_dtype,
    is_datetime64_any_dtype,
    is_bool_dtype,
    is_categorical_dtype,
    CategoricalDtype,
)

__all__ = [
    "mantel",
    "permanova",
    "gower"
]

# -----------------------------------------------------------------------------
# Mantel
# -----------------------------------------------------------------------------
def _rank_vector(a: np.ndarray) -> np.ndarray:
    """
    Fast rank (average ranks for ties) using NumPy only.
    """
    # argsort twice trick
    order = np.argsort(a, kind="mergesort")                  # stable
    ranks = np.empty_like(order, dtype=float)
    ranks[order] = np.arange(1, a.size + 1, dtype=float)     # 1..m

    # handle ties -> average ranks
    # find groups of equal values in sorted order
    s = a[order]
    # run-length encoding
    diff = np.ones_like(s, dtype=bool)
    diff[1:] = s[1:] != s[:-1]
    idx_start = np.flatnonzero(diff)
    idx_end = np.r_[idx_start[1:], s.size]                   # exclusive end

    for b, e in zip(idx_start, idx_end):
        if e - b > 1:
            # average rank for the block b..e-1
            avg = (b + 1 + e) / 2.0
            ranks[order[b:e]] = avg
    return ranks

[docs] def mantel( dis1: pd.DataFrame, dis2: pd.DataFrame, method: Literal["spearman", "pearson", "absDist"] = "spearman", getOnlyStat: bool = False, permutations: int = 999, *, random_state: Union[int, np.random.Generator, None] = None, **kwargs, ) -> Union[float, List[float]]: """ Perform a Mantel test to assess the association between two dissimilarity matrices. The Mantel test evaluates whether pairs of samples that are close (or far apart) in one dissimilarity matrix tend to be close (or far apart) in another. The test statistic is computed by comparing the lower‑triangular entries of the two matrices, and statistical significance is assessed using a permutation test. For correlation-based methods, the association is quantified as a *dissimilarity* (1 − r), where r is the Pearson or Spearman correlation between the vectorized distance matrices. Parameters ---------- dis1 : pandas.DataFrame First square distance or dissimilarity matrix (samples × samples) with identical row and column labels. dis2 : pandas.DataFrame Second square distance or dissimilarity matrix (samples × samples) with identical row and column labels matching `dis1`. method : {'spearman', 'pearson', 'absDist'}, default='spearman' Measure used to quantify association between distance matrices: * 'spearman' : Spearman rank correlation between distances (reported as 1 − ρ). * 'pearson' : Pearson correlation between distances (reported as 1 − r). * 'absDist' : Mean absolute difference between corresponding distances. getOnlyStat : bool, default=False If True, return only the observed test statistic without performing permutations. permutations : int, default=999 Number of permutations used to approximate the null distribution. random_state : int | numpy.random.Generator | None Random seed or NumPy random generator for reproducible permutations. Returns ------- float or list [statistic, p_value] * If `getOnlyStat=True`, returns the observed statistic only. * Otherwise, returns a list containing the observed statistic and its permutation-based p-value. Notes ----- * The test uses only the lower triangular part of each distance matrix (excluding the diagonal), avoiding double counting of pairwise distances. * Sample labels are permuted in `dis1` while `dis2` is held fixed to generate the null distribution. * For correlation-based methods ('pearson', 'spearman'), the reported statistic is a *dissimilarity* (1 − r or 1 − ρ), so **smaller values indicate stronger association** between the two matrices. * p-values are computed using a standard permutation test with a +1 correction: (count + 1) / (permutations + 1). """ # ---- alias handling ---- if "seed" in kwargs: if random_state is not None: raise TypeError("Specify only one of 'random_state' or 'seed'.") random_state = kwargs.pop("seed") if kwargs: raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}") if dis1.isna().any().any() or dis2.isna().any().any(): raise ValueError("Mantel test does not support NaN values in distance matrices.") if not isinstance(dis1, pd.DataFrame) or not isinstance(dis2, pd.DataFrame): raise TypeError("dis1 and dis2 must be pandas DataFrames.") if dis1.shape != dis2.shape: raise ValueError("dis1 and dis2 must have the same shape.") if method not in {"spearman", "pearson", "absDist"}: raise ValueError("method must be 'spearman', 'pearson', or 'absDist'.") # Require identical labels if not dis1.index.equals(dis1.columns): raise ValueError("dis1 must have identical row/column labels") if not dis2.index.equals(dis2.columns): raise ValueError("dis2 must have identical row/column labels") if not dis1.index.equals(dis2.index): raise ValueError("dis1 and dis2 must have identical sample labels") # Align both matrices to the same order samples = dis1.index dis1 = dis1.loc[samples, samples] dis2 = dis2.loc[samples, samples] # --- convert to ndarray and extract lower triangle (k=-1) once --- A = dis1.to_numpy(dtype=float, copy=False) B = dis2.to_numpy(dtype=float, copy=False) n = A.shape[0] if n < 2: # no pairs return 0.0 if getOnlyStat else [0.0, 1.0] tril_i, tril_j = np.tril_indices(n, k=-1) v1 = A[tril_i, tril_j].astype(float, copy=True) v2 = B[tril_i, tril_j].astype(float, copy=True) # --- observed statistic --- if method == "absDist": obs = float(np.mean(np.abs(v1 - v2))) if getOnlyStat: return obs else: if method == "pearson": # z-score once v1_mean, v1_std = v1.mean(), v1.std(ddof=1) v2_mean, v2_std = v2.mean(), v2.std(ddof=1) z1 = (v1 - v1_mean) / v1_std z2 = (v2 - v2_mean) / v2_std obs_r = float((z1 @ z2) / (z1.size - 1)) else: # spearman r1 = _rank_vector(v1) r2 = _rank_vector(v2) r1_mean, r1_std = r1.mean(), r1.std(ddof=1) r2_mean, r2_std = r2.mean(), r2.std(ddof=1) z1 = (r1 - r1_mean) / r1_std z2 = (r2 - r2_mean) / r2_std obs_r = float((z1 @ z2) / (z1.size - 1)) # convert similarity -> dissimilarity like your code obs = 1.0 - obs_r if getOnlyStat: return obs # --- permutations (NumPy RNG, no pandas in loop) --- rng = random_state if isinstance(random_state, np.random.Generator) else np.random.default_rng(random_state) null_stats = np.empty(permutations, dtype=float) if method == "absDist": # absDist doesn't need z-scores for b in range(permutations): perm = rng.permutation(n) v1p = A[perm][:, perm][tril_i, tril_j] null_stats[b] = np.mean(np.abs(v1p - v2)) elif method == "pearson": # z2 fixed; z1 permutes as values permute (mean/std invariant) for b in range(permutations): perm = rng.permutation(n) v1p = A[perm][:, perm][tril_i, tril_j] z1p = (v1p - v1_mean) / v1_std r = (z1p @ z2) / (z1p.size - 1) null_stats[b] = 1.0 - r else: # spearman # r2 fixed; need ranks of v1_perm each time (ties handled) for b in range(permutations): perm = rng.permutation(n) v1p = A[perm][:, perm][tril_i, tril_j] r1p = _rank_vector(v1p) # Pearson on ranks r1p_m, r1p_s = r1p.mean(), r1p.std(ddof=1) z1p = (r1p - r1p_m) / r1p_s r = (z1p @ z2) / (z1p.size - 1) null_stats[b] = 1.0 - r # one-sided p-value (proportion of permuted stats <= observed), +1 correction p = (np.sum(null_stats <= obs) + 1) / (permutations + 1) return [obs, p]
# ----------------------------------------------------------------------------- # Permanova # -----------------------------------------------------------------------------
[docs] def permanova( dis: pd.DataFrame, meta: Union[pd.DataFrame, Dict[str, Any], Any], by: Union[str, List[str]], *, permutations: int = 999, include_interaction: bool = False, strata: Union[str, Sequence[str], None] = None, random_state: Union[int, np.random.Generator, None] = None, perm_scheme: Literal["labels", "freedman-lane"] = "freedman-lane", **kwargs, ) -> Dict[str, Any]: """ PERMANOVA (Anderson, 2001) implemented via projection matrices on the Gower‑centered distance matrix. This function fits a distance‑based linear model with one or two categorical factors (optionally including their interaction) and tests each term using permutation-based pseudo‑F statistics. Tests are *marginal (partial)*: each term is evaluated conditional on all other included terms. Permutation inference can be performed either by permuting sample labels or by permuting residuals from reduced models (Freedman–Lane scheme), with optional restriction of permutations within exchangeability blocks (strata). Parameters ---------- dis : (n x n) pandas.DataFrame Symmetric distance or dissimilarity matrix with identical row and column labels. Rows/columns correspond to samples. meta : pandas.DataFrame | dict | MicrobiomeData-like Sample metadata indexed by sample IDs matching `dis.index`. by : str or list[str] One or two column names in `meta` defining the categorical factor(s). permutations : int, default 999 Number of permutations used to approximate the null distribution. include_interaction : bool, default False If `by` contains two factors and both have more than one level, include and test their interaction term. strata : str | list[str] | None Column name(s) in `meta` defining exchangeability blocks. When given, permutations are restricted to occur *within* each stratum only (i.e. blocked permutations). random_state : int | numpy.random.Generator | None Random seed or generator for reproducible permutations. perm_scheme : {'labels', 'freedman-lane'}, default 'freedman-lane' Permutation scheme used to generate the null distribution: * 'labels': Classical label permutation. Sample labels (factor assignments) are permuted across samples while the distance matrix is kept fixed. When two factors are provided, their labels are permuted jointly, preserving observed factor combinations. Permutations may be restricted within strata if specified. * 'freedman-lane': Residual-based permutation (Freedman & Lane, 1983). For each tested term, residuals from the reduced model excluding that term are permuted (optionally within strata), added back to the fitted values of the reduced model, and the full model is refitted. This scheme yields valid partial tests in the presence of nuisance factors and allows testing main effects even when a factor is constant within strata. Returns ------- dict A dictionary with the following entries: * 'by': List of tested term names (main effects and, if included, interaction). * 'table': pandas.DataFrame with rows corresponding to model terms and the residual, and columns: ['df', 'SS', 'MS', 'F', 'p', 'R2']. * 'permutations': Number of permutations performed. * 'strata': List of strata column names used for restricted permutations, or None. * 'perm_scheme': The permutation scheme used ('labels' or 'freedman-lane'). Notes ----- * The analysis follows the geometric partitioning of sums of squares described by Anderson (2001), using projection (hat) matrices on the Gower‑centered distance matrix. * P‑values are estimated from the permutation distribution using a standard +1 correction: (count + 1) / (permutations + 1). * If a tested factor does not vary within strata under label permutation, the corresponding null distribution may be degenerate and p‑values will be returned as NaN. """ # ---- alias handling ---- if "seed" in kwargs: if random_state is not None: raise TypeError("Specify only one of 'random_state' or 'seed'.") random_state = kwargs.pop("seed") if kwargs: raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}") # ---- validation / alignment ---- M = get_df(meta, "meta") if not isinstance(M, pd.DataFrame) or M.empty: raise ValueError("`meta` must be a non-empty pandas DataFrame.") if not isinstance(dis, pd.DataFrame) or dis.shape[0] != dis.shape[1]: raise ValueError("`dis` must be a square pandas DataFrame.") if not all(dis.index == dis.columns): raise ValueError("`dis` must have identical row and column labels.") if isinstance(by, str): by = [by] if not (1 <= len(by) <= 2): raise ValueError("`by` must be a str or a list of length 1 or 2.") # Ensure categorical dtype for stable dummy coding for b in by: if b not in M.columns: raise ValueError(f"Column '{b}' not found in metadata.") if not isinstance(M[b].dtype, CategoricalDtype): M[b] = M[b].astype("category") # Normalize strata argument if strata is not None: if isinstance(strata, str): strata_cols = [strata] else: strata_cols = list(strata) if any(c not in M.columns for c in strata_cols): missing = [c for c in strata_cols if c not in M.columns] raise ValueError(f"strata columns not found in metadata: {missing}") else: strata_cols = None if perm_scheme == "labels" and strata_cols is not None: tested_terms = set(by) | (set([f"{by[0]}:{by[1]}"]) if len(by)==2 and include_interaction else set()) if any(s in tested_terms for s in strata_cols): warnings.warn( "The 'strata' include a tested term; its permutations are fixed. " "P-values for that term will be uninformative (1.0/NaN).", UserWarning ) # Align metadata to the distance matrix order (do not reorder dis) M = M.reindex(dis.index) required = list(by) + (strata_cols if strata_cols else []) missing_rows = M.index[M[required].isna().any(axis=1)] if len(missing_rows) > 0: raise ValueError(f"Missing required metadata for samples: {missing_rows.tolist()}") # ---- helpers ---- n = dis.shape[0] I = np.eye(n, dtype=float) def _rank(X: np.ndarray) -> int: return 0 if X.size == 0 else int(matrix_rank(X)) def _hat(X: np.ndarray) -> np.ndarray: if X.size == 0: return np.zeros((n, n), dtype=float) P = X @ pinv(X) # numerically stable pseudo-hat return (P + P.T) / 2.0 def _ss(H: np.ndarray, A: np.ndarray) -> float: return float(np.trace(H @ A)) def _dummies(col: str, meta_df: pd.DataFrame) -> np.ndarray: Z = pd.get_dummies(meta_df[col], drop_first=True) return Z.to_numpy(dtype=float) def _interaction_products(X1: np.ndarray, X2: np.ndarray) -> np.ndarray: if X1.size == 0 or X2.size == 0: return np.empty((n, 0), dtype=float) return np.hstack( [X1[:, [i]] * X2[:, [j]] for i in range(X1.shape[1]) for j in range(X2.shape[1])] ).astype(float) def _build_terms(meta_df: pd.DataFrame) -> list[tuple[str, np.ndarray]]: terms: list[tuple[str, np.ndarray]] = [] Z0 = np.ones((n, 1), dtype=float) # Intercept terms.append(("Intercept", Z0)) if len(by) == 1: terms.append((by[0], _dummies(by[0], meta_df))) else: v1, v2 = by X1 = _dummies(v1, meta_df) X2 = _dummies(v2, meta_df) terms.append((v1, X1)) terms.append((v2, X2)) if include_interaction and meta_df[v1].nunique() > 1 and meta_df[v2].nunique() > 1: X12 = _interaction_products(X1, X2) terms.append((f"{v1}:{v2}", X12)) return terms def _permute_labels_inplace(meta_df: pd.DataFrame, cols_to_permute: list[str], strata_cols: list[str] | None) -> pd.DataFrame: """Return a copy of meta_df where specified columns are permuted across rows. If strata_cols is given, permutation is done within each stratum block. Index and row order are preserved. """ out = meta_df.copy() if strata_cols is None: perm = rng.permutation(n) for col in cols_to_permute: out[col] = meta_df[col].to_numpy()[perm] return out for _, grp in meta_df.groupby(strata_cols, sort=False, observed=True): idx = grp.index.to_numpy() if idx.size <= 1: continue perm = rng.permutation(idx.size) for col in cols_to_permute: out.loc[idx, col] = grp[col].to_numpy()[perm] return out def _permute_joint_labels_inplace(meta_df: pd.DataFrame, cols: list[str], strata_cols: list[str] | None, rng: np.random.Generator) -> pd.DataFrame: """ Permute joint labels (rows of cols moved together) while preserving index. If strata_cols is provided, permute jointly within each stratum block. """ out = meta_df.copy() # Encode joint labels as tuples (fast, no delimiter issues) joint = list(zip(*(meta_df[c].to_numpy() for c in cols))) if strata_cols is None: perm = rng.permutation(len(joint)) joint_perm = [joint[i] for i in perm] for k, c in enumerate(cols): out[c] = [t[k] for t in joint_perm] return out # Permute within each stratum block for _, grp in meta_df.groupby(strata_cols, sort=False, observed=True): idx = grp.index.to_numpy() m = idx.size if m <= 1: continue joint_block = list(zip(*(grp[c].to_numpy() for c in cols))) perm = rng.permutation(m) joint_perm = [joint_block[i] for i in perm] for k, c in enumerate(cols): out.loc[idx, c] = [t[k] for t in joint_perm] return out # ---- Gower centering ---- D2 = dis.to_numpy(dtype=float) ** 2 np.fill_diagonal(D2, 0.0) D2 = (D2 + D2.T) / 2.0 G = I - np.ones((n, n), dtype=float) / n A = -0.5 * (G @ D2 @ G) total_SS = float(np.trace(A)) # ---- design & observed stats ---- terms = _build_terms(M) X_all = np.concatenate([X for _, X in terms], axis=1) if terms else np.empty((n, 0)) r_all = _rank(X_all) H_all = _hat(X_all) H_res = I - H_all SS_res = _ss(H_res, A) df_res = n - r_all if df_res <= 0: rows = [] for name, X in terms: if name == "Intercept": continue SS_t = np.nan df_t = 0 rows.append([name, df_t, SS_t, np.nan, np.nan, np.nan, np.nan]) rows.append(["Residual", df_res, SS_res, np.nan, np.nan, np.nan, (SS_res / total_SS) if total_SS > 0 else np.nan]) out_df = pd.DataFrame(rows, columns=["term","df","SS","MS","F","p","R2"]).set_index("term") return {"by": [t for t in out_df.index if t != "Residual"], "table": out_df, "permutations": permutations, "strata": (strata_cols if strata_cols else None), "perm_scheme": perm_scheme} rows = [] F_obs = [] df_list = [] # Cache per-term projections for re-use (also used by FL) per_term_info: dict[str, dict[str, np.ndarray | int]] = {} for i, (name_i, _) in enumerate(terms): if name_i == "Intercept": continue X_others = np.concatenate([X for j, (nm, X) in enumerate(terms) if j != i], axis=1) \ if len(terms) > 1 else np.empty((n, 0)) r_others = _rank(X_others) H_others = _hat(X_others) H_term = (H_all - H_others + (H_all - H_others).T) / 2.0 SS_t = _ss(H_term, A) df_t = r_all - r_others if df_t <= 0: MS_t = F_t = np.nan else: MS_t = SS_t / df_t MS_res = SS_res / df_res F_t = MS_t / MS_res R2_t = (SS_t / total_SS) if total_SS > 0 else np.nan rows.append([name_i, df_t, SS_t, MS_t, F_t, np.nan, R2_t]) F_obs.append(F_t) df_list.append(df_t) # Store for later use (Freedman–Lane) per_term_info[name_i] = { "H_others": H_others, "H_term": H_term, "df_t": df_t, } rows.append(["Residual", df_res, SS_res, SS_res/df_res, np.nan, np.nan, (SS_res / total_SS) if total_SS > 0 else np.nan]) out_df = pd.DataFrame(rows, columns=["term","df","SS","MS","F","p","R2"]).set_index("term") # ---- permutations ---- n_terms = len(F_obs) if permutations and n_terms > 0: rng = random_state if isinstance(random_state, np.random.Generator) else np.random.default_rng(random_state) F_null = np.zeros((permutations, n_terms), dtype=float) # Helper: produce a permutation order (indices) respecting strata # (for label permutations we permute meta, for FL we permute residuals A_resid) index_to_pos = {lab: i for i, lab in enumerate(M.index)} def _permute_order_within_strata() -> np.ndarray: if strata_cols is None: return rng.permutation(n) order_list: list[int] = [] for _, grp in M.groupby(strata_cols, sort=False, observed=True): pos = np.array([index_to_pos[idx] for idx in grp.index], dtype=int) if pos.size <= 1: order_list.extend(pos.tolist()) else: perm_pos = pos[rng.permutation(pos.size)] order_list.extend(perm_pos.tolist()) return np.asarray(order_list, dtype=int) # ------- Branch 1: classical label permutations ------- if perm_scheme == "labels": for b in range(permutations): if len(by) == 1: M_perm = _permute_labels_inplace(M, list(by), strata_cols) else: M_perm = _permute_joint_labels_inplace(M, list(by), strata_cols, rng) if len(by) == 2: unchanged = (M_perm[by[0]].to_numpy() == M[by[0]].to_numpy()) & (M_perm[by[1]].to_numpy() == M[by[1]].to_numpy()) if unchanged.all(): raise RuntimeError("Joint permutation produced no changes; check strata sizes/permutation logic.") terms_p = _build_terms(M_perm) X_all_p = np.concatenate([X for _, X in terms_p], axis=1) if terms_p else np.empty((n, 0)) r_all_p = _rank(X_all_p) H_all_p = _hat(X_all_p) H_res_p = I - H_all_p SS_res_p = _ss(H_res_p, A) df_res_p = n - r_all_p if df_res_p <= 0: F_null[b, :] = np.nan continue term_names_p = [nm for nm, _ in terms_p if nm != "Intercept"] for i, name_i in enumerate(out_df.index[:-1]): # tested terms if name_i not in term_names_p: F_null[b, i] = np.nan continue X_others_p = np.concatenate( [X for (nm, X) in terms_p if nm not in ("Intercept", name_i)], axis=1 ) if len(terms_p) > 2 else np.empty((n, 0)) H_others_p = _hat(X_others_p) H_term_p = (H_all_p - H_others_p + (H_all_p - H_others_p).T) / 2.0 SS_t_p = _ss(H_term_p, A) df_t_p = _rank(X_all_p) - _rank(X_others_p) if df_t_p <= 0: F_null[b, i] = np.nan else: MS_t_p = SS_t_p / df_t_p MS_res_p = SS_res_p / df_res_p F_null[b, i] = MS_t_p / MS_res_p # ------- Branch 2: Freedman–Lane residual permutations ------- else: # perm_scheme == "freedman-lane" (default) # Precompute per-term components for FL: fitted (others) and residual (others) per_term_FL: dict[str, dict[str, np.ndarray | int]] = {} for name_i, info in per_term_info.items(): H_others_i = info["H_others"] A_fit_others_i = H_others_i @ A @ H_others_i R = (I - H_others_i) @ A @ (I - H_others_i) # residuals of reduced model # Symmetrize for numerical stability R = (R + R.T) / 2.0 per_term_FL[name_i] = { "A_fit_others": A_fit_others_i, "R": R, "H_term": info["H_term"], "df_t": info["df_t"], } for b in range(permutations): order = _permute_order_within_strata() # For each term, build A* = A_fit(others) + P R P^T and compute F for i, name_i in enumerate(out_df.index[:-1]): info = per_term_FL[name_i] df_t_i = info["df_t"] if df_t_i is None or df_t_i <= 0: F_null[b, i] = np.nan continue # Permute residuals of the reduced model R = info["R"] R_perm = R[order, :][:, order] # P R P^T # Pseudo-response under H0 for the term A_star = info["A_fit_others"] + R_perm # Compute F using same H_term and H_res from observed design SS_t_star = _ss(info["H_term"], A_star) MS_t_star = SS_t_star / df_t_i SS_res_star = _ss(H_res, A_star) MS_res_star = SS_res_star / df_res F_null[b, i] = MS_t_star / MS_res_star # ---- p-values with robustness checks ---- for i, name in enumerate(out_df.index[:-1]): Fi = out_df.loc[name, "F"] df_t = out_df.loc[name, "df"] if not np.isfinite(Fi) or df_t <= 0: out_df.loc[name, "p"] = np.nan continue perm_vals = F_null[:, i] perm_vals = perm_vals[np.isfinite(perm_vals)] if perm_vals.size == 0: out_df.loc[name, "p"] = np.nan continue p = (np.sum(perm_vals >= Fi) + 1) / (perm_vals.size + 1) if df_t > 0: if perm_vals.size == 0: out_df.loc[name, "p"] = np.nan continue if np.unique(np.round(perm_vals, 12)).size == 1: out_df.loc[name, "p"] = np.nan continue if strata_cols is not None and any( (col == name or name.startswith(col + ":")) and any(len(g) <= 1 for _, g in M.groupby(strata_cols, observed=True)) for col in strata_cols ): out_df.loc[name, "p"] = np.nan continue out_df.loc[name, "p"] = p return { "by": [t for t in out_df.index if t != "Residual"], "table": out_df, "permutations": permutations, "strata": (strata_cols if strata_cols else None), "perm_scheme": perm_scheme, }
# ----------------------------------------------------------------------------- # Gower # -----------------------------------------------------------------------------
[docs] def gower( meta: Union[pd.DataFrame, Dict[str, Any], Any] = None, *, by: Union[Sequence[str], str, None] = None, return_similarity: bool = False, ) -> pd.DataFrame: """ Compute the Gower distance matrix for a pandas DataFrame containing mixed variable types (numeric, categorical/boolean, datetime). Parameters ---------- meta : pd.DataFrame, dict, or MicrobiomeData object Input data. Rows are samples; columns are variables. by : Sequence[str] or str, optional Variable names (columns) to include. If None, all columns are included. return_similarity : bool, optional If True, return Gower similarity (1 - distance). Default False (distance). Returns ------- pandas.DataFrame Pairwise Gower distances (or similarities) between samples (rows). Notes ----- - Numerical variables are scaled by their range (max - min). If the range is 0 (constant column), that variable contributes 0 for all pairs. - Datetime variables are converted to days (float) and treated as numeric. - Categorical/boolean variables contribute 0 when equal, 1 when different. - Missing values: a variable only contributes for row pairs where it is present in both rows; the per-pair denominator is the count of contributing variables for that pair. """ # Obtain DataFrame df = get_df(meta, "meta") if df is None or df.empty: raise ValueError("Input 'meta' is missing or empty.") # Column selection if by is None: X = df.copy() elif isinstance(by, str): if by not in df.columns: raise ValueError(f"Variable '{by}' not found in columns.") X = df[[by]].copy() else: by = list(by) missing = [c for c in by if c not in df.columns] if missing: raise ValueError(f"Variables not found in columns: {missing}") X = df[by].copy() if X.shape[1] == 0: raise ValueError("No variables selected (empty 'by').") # Determine variable types cols_num: List[str] = [] cols_cat: List[str] = [] NS_PER_DAY = 86_400_000_000_000 # nanoseconds per day for col in X.columns: s = X[col] if is_datetime64_any_dtype(s): # Convert datetime to numeric (days, float); preserve NaT as NaN # .view('int64') works for tz-naive; for tz-aware, .astype('int64') is fine in recent pandas vals_int = s.view("int64") vals_days = vals_int.astype("float") / NS_PER_DAY X[col] = vals_days cols_num.append(col) elif is_numeric_dtype(s): cols_num.append(col) elif is_bool_dtype(s) or is_categorical_dtype(s) or s.dtype == object: cols_cat.append(col) else: # Fallback: treat as categorical cols_cat.append(col) n = len(X) # Early return for single row if n == 1: return pd.DataFrame( np.array([[0.0 if not return_similarity else 1.0]]), index=X.index, columns=X.index, ) num = np.zeros((n, n), dtype=np.float64) # numerator: sum of per-variable distances den = np.zeros((n, n), dtype=np.float64) # denominator: count of valid variables per pair # --- Numerical variables --- for col in cols_num: x = X[col].to_numpy(dtype=float) mask = ~np.isnan(x) both = np.outer(mask, mask) # Range scaling (exclude NaNs) valid_vals = x[mask] if valid_vals.size == 0: # No contribution; but den remains unchanged (no both) continue r = valid_vals.max() - valid_vals.min() if not np.isfinite(r) or r == 0.0: # Constant or non-finite range -> contributes 0 where both present contrib = np.zeros((n, n), dtype=float) else: diffs = np.abs(x[:, None] - x[None, :]) / r diffs[~both] = 0.0 contrib = diffs num += contrib den += both.astype(float) # --- Categorical / Boolean / Object variables --- # Compare factorized integer codes rather than Python objects (faster, leaner) for col in cols_cat: s = X[col] # Factorize with NaN as -1 codes, uniques = pd.factorize(s, sort=False, use_na_sentinel=True) codes = codes.astype(np.int64) mask = codes != -1 both = np.outer(mask, mask) # Diff = 1 where codes differ, 0 where equal # Using broadcasting on integer codes eq = (codes[:, None] == codes[None, :]) diff = (~eq).astype(float) diff[~both] = 0.0 num += diff den += both.astype(float) # Finalize with np.errstate(invalid="ignore", divide="ignore"): dist = num / den dist[den == 0] = np.nan # no comparable variables for that pair # Convert to similarity if requested result = 1.0 - dist if return_similarity else dist # Diagonal np.fill_diagonal(result, 0.0 if not return_similarity else 1.0) return pd.DataFrame(result, index=X.index, columns=X.index)