Source code for qdiv.stats.ordination_calculations

"""
Ordination utilities: PCoA, db-RDA, and marginal (partial) permutation tests.

Public API:
    - pcoa_lingoes
    - dbrda
    - summarize_dbrda
"""
from __future__ import annotations
import warnings
import numpy as np
import pandas as pd
from numpy.linalg import svd, lstsq
from typing import Union, List, Optional, Any, Dict
from ..utils import get_df

__all__ = [
    "pcoa_lingoes",
    "dbrda",
    "marginal_factor_tests_dbrda",
    "summarize_dbrda"]

# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------

def _encode_metadata_df(meta: pd.DataFrame, drop_first: bool = True):
    """
    Encode metadata into a design matrix X:
      - numeric columns: centered
      - categorical columns: dummy-coded (treatment coding if drop_first=True)

    Returns
    -------
    X_df : pandas.DataFrame
        Encoded design matrix aligned to meta.index.
    names : list[str]
        Column names of X_df.
    cont_mask : numpy.ndarray[bool]
        Mask for columns that came from numeric covariates (continuous).
    """
    num_cols = meta.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = [c for c in meta.columns if c not in num_cols]

    X_num = pd.DataFrame(index=meta.index)
    if num_cols:
        X_num = meta[num_cols].astype(float)
        X_num = X_num - X_num.mean(axis=0)

    X_cat = pd.DataFrame(index=meta.index)
    if cat_cols:
        X_cat = pd.get_dummies(meta[cat_cols], drop_first=drop_first)

    X = pd.concat([X_num, X_cat], axis=1)
    if X.shape[1] == 0:
        raise ValueError("Metadata produced an empty design matrix.")
    names = list(X.columns)
    cont_mask = np.array([c in num_cols for c in names], dtype=bool)
    return X, names, cont_mask


def _hat_matrix(X: np.ndarray) -> np.ndarray:
    """Projection (hat) matrix for possibly rank-deficient X via SVD."""
    if X.size == 0 or X.shape[1] == 0:
        # No columns: projection is a zero matrix
        return np.zeros((X.shape[0], X.shape[0]), dtype=float)
    U, s, VT = svd(X, full_matrices=False)
    tol = np.finfo(float).eps * max(X.shape) * (s[0] if s.size > 0 else 0.0)
    s_inv = np.array([1.0/si if si > tol else 0.0 for si in s])
    X_pinv = (VT.T * s_inv) @ U.T
    return X @ X_pinv


def _residualize(M: np.ndarray, Z: np.ndarray) -> np.ndarray:
    """Residualize matrix M with respect to Z."""
    HZ = _hat_matrix(Z)
    return (np.eye(Z.shape[0]) - HZ) @ M


def _inertia_from_projection(H: np.ndarray, Y: np.ndarray) -> float:
    """
    Constrained inertia for multivariate response Y under projection H:
    sum of column variances of H @ Y (ddof=0).
    """
    fitted = H @ Y
    return float(np.sum(np.var(fitted, axis=0)))


def _build_design_with_interactions(meta: pd.DataFrame,
                                    interactions=None,
                                    drop_first: bool = True):
    """
    Build design matrix with main effects and optional categorical interactions.

    Parameters
    ----------
    meta : pd.DataFrame
        Explanatory variables (numeric + categorical).
    interactions : list[tuple[str, str]] | None
        Pairs of CATEGORICAL factor names for which to create interaction dummies.
        Example: [('Biochar', 'Plant')]
    drop_first : bool
        Treatment coding for dummies. Keep True unless you handle collinearity manually.

    Returns
    -------
    X_df : pd.DataFrame
        Design matrix with interaction columns appended (if any).
    groups : dict[str, list[str]]
        Mapping from factor name (and interaction name "A×B") to column names in X_df.
    """
    interactions = interactions or []
    X_df, names, cont_mask = _encode_metadata_df(meta, drop_first=drop_first)

    # Identify original categorical columns
    num_cols = meta.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = [c for c in meta.columns if c not in num_cols]

    # Grouping: numeric 1:1, categorical by dummy prefix
    groups = {}
    for col in num_cols:
        if col in X_df.columns:
            groups[col] = [col]
    for col in cat_cols:
        cols = [c for c in X_df.columns if c.startswith(col + "_")]
        if cols:
            groups[col] = cols

    # Interactions only for categorical factors
    for (a, b) in interactions:
        if a not in cat_cols or b not in cat_cols:
            raise ValueError(f"Interactions are only supported between categorical factors. Got: {a}, {b}.")
        a_cols = [c for c in X_df.columns if c.startswith(a + "_")]
        b_cols = [c for c in X_df.columns if c.startswith(b + "_")]
        inter_cols = []
        for ac in a_cols:
            for bc in b_cols:
                new_name = f"{a}:{ac.split(a+'_',1)[1]}×{b}:{bc.split(b+'_',1)[1]}"
                X_df[new_name] = X_df[ac].values * X_df[bc].values
                inter_cols.append(new_name)
        if inter_cols:
            groups[f"{a}×{b}"] = inter_cols

    return X_df, groups

# -----------------------------------------------------------------------------
# PCoA
# -----------------------------------------------------------------------------
[docs] def pcoa_lingoes( dis: pd.DataFrame ) -> pd.DataFrame: """ Perform Principal Coordinates Analysis (PCoA) using the Lingoes correction. The Lingoes correction transforms a non‑Euclidean distance matrix into a Euclidean one by adding a constant to all squared distances, ensuring that all eigenvalues are non‑negative. PCoA is then performed on the corrected matrix to obtain principal coordinate axes. Parameters ---------- dis : pandas.DataFrame Square distance matrix (rows and columns represent samples). Values must be non‑negative and the matrix must be symmetric. Returns ------- coords_df : pandas.DataFrame Principal coordinate scores (samples × axes), ordered by decreasing eigenvalue magnitude. eigvals : pandas.Series Eigenvalues associated with each axis (only the positive eigenvalues after Lingoes correction). pct_explained : pandas.Series Percentage of total variance explained by each axis (positive eigenvalues only). total_variance : float Sum of all positive eigenvalues after correction. Notes ----- - The Lingoes correction is applied only if negative eigenvalues are detected. - The output coordinates are centered and scaled according to standard PCoA conventions. """ if not isinstance(dis, pd.DataFrame): raise TypeError("dist_df must be a pandas DataFrame.") if dis.shape[0] != dis.shape[1]: raise ValueError("Distance matrix must be square.") # Copy & coerce to float D_df = dis.copy().astype(float) # Enforce symmetry and zero diagonal D_df = ((D_df + D_df.T) / 2.0) D = D_df.to_numpy(copy=True) np.fill_diagonal(D, 0.0) D_df = pd.DataFrame(D, index=D_df.index, columns=D_df.columns) if (D < 0).any(): raise ValueError("Distances must be non‑negative.") labels = D_df.index.to_list() n = D_df.shape[0] J = np.eye(n) - np.ones((n, n)) / n def double_center(d): # Gower centering on squared distances return -0.5 * J @ (d ** 2) @ J # Initial Gower matrix B = double_center(D_df.values) # Numerical symmetry safeguard before eigh B = (B + B.T) / 2.0 eigvals, eigvecs = np.linalg.eigh(B) # Sort descending order = np.argsort(eigvals)[::-1] eigvals = eigvals[order] eigvecs = eigvecs[:, order] # Lingoes correction if negative eigenvalues exist if eigvals.min() < 0: c = 2.0 * abs(eigvals.min()) # Lingoes: add 2|λ_min| to squared distances D_corr = np.sqrt(np.maximum(D_df.values**2 + c, 0.0)) B = double_center(D_corr) B = (B + B.T) / 2.0 eigvals, eigvecs = np.linalg.eigh(B) order = np.argsort(eigvals)[::-1] eigvals = eigvals[order] eigvecs = eigvecs[:, order] # Keep positive eigenvalues (allow tiny negative due to round‑off) tol = max(1e-12, 1e-12 * np.max(np.abs(eigvals))) pos = eigvals > tol if not np.any(pos): raise ValueError("No positive eigenvalues after Lingoes correction; check the distance matrix.") eigvals_pos = eigvals[pos] eigvecs_pos = eigvecs[:, pos] # Coordinates scaled by sqrt(λ) coords = eigvecs_pos * np.sqrt(eigvals_pos) axis_names = [f"PCo{i+1}" for i in range(len(eigvals_pos))] coords_df = pd.DataFrame(coords, index=labels, columns=axis_names) eigvals_series = pd.Series(eigvals_pos, index=axis_names) explained_var_series = pd.Series((eigvals_pos / eigvals_pos.sum()) * 100.0, index=axis_names).round(2) out = { 'site_scores': coords_df, 'eigenvalues': eigvals_series, 'pct_explained': explained_var_series, 'total_variance': eigvals_series.sum()} return out
# ----------------------------------------------------------------------------- # db-RDA (global model + scores + per-variable contributions) # -----------------------------------------------------------------------------
[docs] def dbrda( dis: pd.DataFrame = None, meta: Union[pd.DataFrame, Dict[str, Any], Any] = None, *, by: Optional[Union[str, List[str]]] = None, condition: Optional[pd.DataFrame] = None, n_axes: int = 2, scale: str = "site", perm_n: int = 999, perm_seed: int = 42, pcoa_fn=pcoa_lingoes, per_var_perm: bool = False, interactions: Optional[List[str]] = None, drop_first: bool = True ) -> Dict[str, Any]: """ Distance‑based Redundancy Analysis (db‑RDA). This function performs constrained ordination on a distance matrix by: 1. Converting the distance matrix into principal coordinates (PCoA) using the specified PCoA function (default: Lingoes correction). 2. Regressing the PCoA coordinates onto explanatory variables. 3. Extracting constrained axes, biplot scores, and variance components. 4. Performing a global permutation test (Freedman–Lane). 5. Optionally computing per‑variable permutation p‑values. 6. Optionally including categorical interaction terms. Parameters ---------- dis : pandas.DataFrame Square distance matrix (samples × samples). Must have matching row/column labels. meta : pandas.DataFrame Metadata table containing explanatory variables (rows = samples). by : str or list of str, optional Subset of metadata columns to use as explanatory variables. If None, all columns in `meta` are used. condition : pandas.DataFrame, optional Conditioning variables for partial db‑RDA. Must align with `meta`. n_axes : int, default=2 Number of constrained axes to return. scale : {'site', 'species'}, default='site' Scaling for biplot scores. perm_n : int, default=999 Number of permutations for the global test. perm_seed : int, default=42 Random seed for reproducibility. pcoa_fn : callable, default=pcoa_lingoes Function used to compute PCoA. Must return a dict with 'site_scores' and 'eigenvalues'. per_var_perm : bool, default=False If True, compute permutation p‑values for each predictor. interactions : list of str, optional Variables for which interaction terms should be generated. drop_first : bool, default=True Whether to drop the first dummy level when encoding categorical variables. Returns ------- dict { 'site_scores' : pandas.DataFrame, 'biplot_scores' : pandas.DataFrame, 'variable_contributions' : pandas.DataFrame, 'eigenvalues' : numpy.ndarray, 'explained_ratio' : numpy.ndarray, 'total_inertia' : float, 'constrained_inertia' : float, 'unconstrained_inertia' : float, 'F_global' : float, 'p_global' : float } Notes ----- - The global permutation test uses the Freedman–Lane procedure. - Partial db‑RDA is performed by residualizing both the response coordinates and the design matrix against the conditioning variables. - Interaction terms are constructed before dummy encoding. """ meta = get_df(meta, "meta") if meta is None: raise ValueError('meta is missing.') # Select metadata columns if by is None: meta_use = meta.copy() elif isinstance(by, str): meta_use = meta[[by]].copy() elif isinstance(by, list): meta_use = meta[by].copy() else: raise TypeError("`by` must be None, a string, or a list of strings.") # Align metadata to distance matrix if not isinstance(dis, pd.DataFrame): raise TypeError("`dis` must be a pandas DataFrame.") if not meta_use.index.equals(dis.index): meta_use = meta_use.loc[dis.index] # PCoA pcoa_res = pcoa_fn(dis) coords = pcoa_res["site_scores"] eig_vals = pcoa_res["eigenvalues"] U_all = coords.values.astype(float) sample_index = coords.index total_inertia = float(np.sum(eig_vals)) # Build design matrix (with optional interactions) if interactions: X_df, _ = _build_design_with_interactions( meta_use, interactions=interactions, drop_first=drop_first ) xnames = list(X_df.columns) else: X_df, xnames, _ = _encode_metadata_df(meta_use, drop_first=drop_first) X = X_df.values.astype(float) # Partial db‑RDA (conditioning) if condition is not None: if condition.shape[0] != meta_use.shape[0]: raise ValueError("`condition` must have same number of rows as `meta`.") Z_df, _, _ = _encode_metadata_df(condition.loc[meta_use.index], drop_first=drop_first) Z = Z_df.values.astype(float) U_basis = _residualize(U_all, Z) X_basis = _residualize(X, Z) else: U_basis = U_all X_basis = X # Projection onto constrained space H = _hat_matrix(X_basis) fitted = (H @ U_basis).astype(float) # Constrained covariance matrix if fitted.shape[1] == 1: warnings.warn( "Only one positive eigenvalue detected. dbRDA will return a single axis.", UserWarning ) Cc = np.array([[np.var(fitted[:, 0], ddof=0)]]) else: Cc = np.cov(fitted.T, bias=True) # Eigen decomposition lam, V = np.linalg.eigh(Cc) order = np.argsort(lam)[::-1] lam = lam[order] V = V[:, order] sites_all = fitted @ V a = min(n_axes, sites_all.shape[1]) eigvals = lam[:a] sites = sites_all[:, :a] constrained_inertia = float(np.sum(lam)) explained_ratio = eigvals / total_inertia # Biplot scores biplot = [] for ax_i in range(a): coef, _, _, _ = lstsq(X_basis, sites[:, ax_i], rcond=None) biplot.append(coef) biplot = np.array(biplot).T if scale == "species": maxlen = np.max(np.linalg.norm(biplot, axis=0)) if maxlen > 0: biplot = biplot / maxlen # Global permutation test (Freedman–Lane) Hr = _hat_matrix(np.zeros((X_basis.shape[0], 0))) Hf = _hat_matrix(X_basis) Fitted0 = Hr @ U_basis E = U_basis - Fitted0 df_model = int(np.linalg.matrix_rank(X_basis)) df_resid = int(U_basis.shape[0] - df_model - 1) I_f = _inertia_from_projection(Hf, U_basis) I_unconstr = total_inertia - I_f F_obs = (I_f / max(df_model, 1)) / (I_unconstr / max(df_resid, 1)) rng = np.random.RandomState(perm_seed) hits = 0 for _ in range(perm_n): perm = rng.permutation(U_basis.shape[0]) Y_star = Fitted0 + E[perm, :] I_f_star = _inertia_from_projection(Hf, Y_star) F_star = (I_f_star / max(df_model, 1)) / ( (total_inertia - I_f_star) / max(df_resid, 1) ) hits += (F_star >= F_obs) pval_global = (hits + 1) / (perm_n + 1) # Per‑variable contributions contributions = [] pvals = [] for i in range(X_basis.shape[1]): Xi = X_basis[:, [i]] Hi = _hat_matrix(Xi) inertia_i = float(np.sum(np.var(Hi @ U_basis, axis=0))) contributions.append(100.0 * inertia_i / total_inertia) if per_var_perm: h = 0 for _ in range(perm_n): Xp = Xi[rng.permutation(Xi.shape[0]), :] Hp = _hat_matrix(Xp) inertia_p = float(np.sum(np.var(Hp @ U_basis, axis=0))) if inertia_p >= inertia_i: h += 1 pvals.append((h + 1) / (perm_n + 1)) else: pvals.append(None) # Output axis_names = [f"dbRDA{i+1}" for i in range(a)] return { "site_scores": pd.DataFrame(sites, index=sample_index, columns=axis_names), "biplot_scores": pd.DataFrame(biplot, index=xnames, columns=axis_names), "variable_contributions": pd.DataFrame({ "Predictor": xnames, "pct_explained": contributions, "p-value": pvals }), "eigenvalues": eigvals, "explained_ratio": explained_ratio, "total_inertia": total_inertia, "constrained_inertia": constrained_inertia, "unconstrained_inertia": total_inertia - constrained_inertia, "F_global": F_obs, "p_global": pval_global }
# ----------------------------------------------------------------------------- # Marginal (partial) permutation tests (Freedman–Lane) # -----------------------------------------------------------------------------
[docs] def marginal_factor_tests_dbrda( dis: pd.DataFrame, meta: pd.DataFrame, *, by: Optional[Union[str, List[str]]] = None, condition: Optional[pd.DataFrame] = None, interactions: Optional[List[str]] = None, pcoa_fn=pcoa_lingoes, perm_n: int = 999, perm_seed: int = 42, drop_first: bool = True, return_F: bool = True ) -> pd.DataFrame: """ Marginal (partial) permutation tests for db‑RDA factors. Performs Freedman–Lane marginal tests for each factor block in the design matrix, controlling for all other terms. Also computes diagnostics based on the factor block alone (XG): - inertia_alone - pct_explained (alone) - p-alone (simple permutation test ignoring other terms) Parameters ---------- dis : pandas.DataFrame Square distance matrix (samples × samples). meta : pandas.DataFrame Metadata table (rows = samples). by : str or list of str, optional Subset of metadata columns to include as explanatory variables. If None, all columns in `meta` are used. condition : pandas.DataFrame, optional Conditioning variables for partial db‑RDA. interactions : list of str, optional Variables for which interaction terms should be generated. pcoa_fn : callable, default=pcoa_lingoes Function returning {'site_scores', 'eigenvalues'}. perm_n : int, default=999 Number of permutations. perm_seed : int, default=42 Random seed. drop_first : bool, default=True Whether to drop the first dummy level when encoding categorical variables. return_F : bool, default=True If True, compute F‑statistics; otherwise use raw delta inertia. Returns ------- pandas.DataFrame Columns: Factor df_added delta_inertia pct_explained (marginal) inertia_alone pct_explained (alone) F p-marginal p-alone """ # Select metadata columns if by is None: meta_use = meta.copy() elif isinstance(by, str): meta_use = meta[[by]].copy() elif isinstance(by, list): meta_use = meta[by].copy() else: raise TypeError("`by` must be None, a string, or a list of strings.") # Align metadata to distance matrix if not meta_use.index.equals(dis.index): meta_use = meta_use.loc[dis.index] # PCoA pcoa_res = pcoa_fn(dis) coords = pcoa_res["site_scores"] eig_vals = pcoa_res["eigenvalues"] Y_all = coords.values.astype(float) total_inertia = float(np.sum(eig_vals)) n = Y_all.shape[0] # Build design matrix X_df, groups = _build_design_with_interactions( meta_use, interactions=interactions, drop_first=drop_first ) X_all = X_df.values.astype(float) # Conditioning (partial db‑RDA) if condition is not None: if condition.shape[0] != meta_use.shape[0]: raise ValueError("`condition` must have same number of rows as `meta`.") Z_df, _, _ = _encode_metadata_df(condition.loc[meta_use.index], drop_first=drop_first) Z = Z_df.values.astype(float) Y = _residualize(Y_all, Z) X = _residualize(X_all, Z) else: Y = Y_all X = X_all rng = np.random.default_rng(perm_seed) rows = [] def rank(A): return int(np.linalg.matrix_rank(A)) if A.size > 0 else 0 # Loop over each factor block for factor, cols in groups.items(): # Identify columns belonging to this factor idx = [X_df.columns.get_loc(c) for c in cols] mask = np.zeros(X.shape[1], dtype=bool) mask[idx] = True XG = X[:, mask] # factor block Xr = X[:, ~mask] # reduced model without factor rk_r = rank(Xr) rk_full = rank(X) df_added = rk_full - rk_r # Aliased factor (no degrees of freedom) if df_added <= 0: rows.append({ "Factor": factor, "df_added": 0, "delta_inertia": 0.0, "pct_explained (marginal)": 0.0, "inertia_alone": 0.0, "pct_explained (alone)": 0.0, "F": np.nan, "p-marginal": np.nan, "p-alone": np.nan, "note": "aliased (df_added=0)" }) continue # Marginal contribution (Freedman–Lane) Hr = _hat_matrix(Xr) Hf = _hat_matrix(X) I_r = _inertia_from_projection(Hr, Y) I_f = _inertia_from_projection(Hf, Y) delta_obs = I_f - I_r df_resid = n - rk_full - 1 if return_F: denom = (total_inertia - I_f) / max(df_resid, 1) F_obs = (delta_obs / df_added) / denom if denom > 0 else np.inf else: F_obs = np.nan # Permutation test (marginal) Fitted_r = Hr @ Y E = Y - Fitted_r hits = 0 for _ in range(perm_n): perm = rng.permutation(n) Y_star = Fitted_r + E[perm, :] I_r_star = _inertia_from_projection(Hr, Y_star) I_f_star = _inertia_from_projection(Hf, Y_star) delta_star = I_f_star - I_r_star if return_F: denom_star = (total_inertia - I_f_star) / max(df_resid, 1) F_star = (delta_star / df_added) / denom_star if denom_star > 0 else np.inf if F_star >= F_obs: hits += 1 else: if delta_star >= delta_obs: hits += 1 pval_marginal = (hits + 1) / (perm_n + 1) # Diagnostics: factor alone (XG) Hi = _hat_matrix(XG) inertia_alone = _inertia_from_projection(Hi, Y) pct_alone = 100.0 * inertia_alone / total_inertia # Simple permutation test for factor alone hits_alone = 0 for _ in range(perm_n): perm = rng.permutation(n) inertia_star = _inertia_from_projection(Hi, Y[perm, :]) if inertia_star >= inertia_alone: hits_alone += 1 pval_alone = (hits_alone + 1) / (perm_n + 1) rows.append({ "Factor": factor, "df_added": int(df_added), "delta_inertia": float(delta_obs), "pct_explained (marginal)": 100.0 * float(delta_obs) / total_inertia, "inertia_alone": float(inertia_alone), "pct_explained (alone)": pct_alone, "F": float(F_obs), "p-marginal": float(pval_marginal), "p-alone": float(pval_alone) }) # Output out = ( pd.DataFrame(rows) .sort_values(["p-marginal", "pct_explained (marginal)"], ascending=[True, False]) .reset_index(drop=True) ) return out
# ----------------------------------------------------------------------------- # Summarize db-RDA # -----------------------------------------------------------------------------
[docs] def summarize_dbrda( dis: pd.DataFrame, meta: pd.DataFrame, *, by: Optional[Union[str, List[str]]] = None, condition: Optional[pd.DataFrame] = None, interactions: Optional[List[str]] = None, pcoa_fn=pcoa_lingoes, perm_n: int = 999, perm_seed: int = 42, drop_first: bool = True, include_interpretation: bool = True, include_alone: bool = True ) -> pd.DataFrame: """ Summarize db‑RDA (global model + marginal factor tests). This function: 1. Runs dbRDA once (global model). 2. Runs marginal (partial) permutation tests per factor (Freedman–Lane). 3. Aggregates % explained by original factors (from the full model). 4. Computes R² and adjusted R². 5. Returns a tidy DataFrame, optionally with textual interpretation. Parameters ---------- dist : pandas.DataFrame Square distance matrix (rows/cols = samples). Index must match columns. meta : pandas.DataFrame Metadata indexed by sample IDs. by : str or list of str, optional Subset of metadata columns to use as explanatory variables. If None, all columns in `meta` are used. condition : pandas.DataFrame, optional Covariates to partial out (same index as `meta`). interactions : list of str, optional Variables for which interaction terms should be generated. pcoa_fn : callable, default=pcoa_lingoes Function for the PCoA step; must return 'site_scores' and 'eigenvalues'. perm_n : int, default=999 Number of permutations for marginal tests. perm_seed : int, default=42 Random seed for permutations. drop_first : bool, default=True Drop first level in categorical encoding (reference coding). include_interpretation : bool, default=True If True, adds a textual interpretation column. include_alone : bool, default=True If True, keeps “alone” diagnostics (factor-alone %-explained, p-alone). Returns ------- pandas.DataFrame Columns (by default): - Factor - pct_explained (full model) - df_added - delta_inertia - pct_explained (marginal) - F - p-marginal - inertia_alone - pct_explained (alone) - p-alone - Interpretation (optional) Attributes (df.attrs): - 'R²' : float - 'Adjusted R²' : float - 'F_global' : float - 'p_global' : float - 'Total inertia' : float - 'Constrained inertia' : float - 'Unconstrained inertia' : float - 'n' : int (samples) - 'df_model' : int (approx. number of fitted parameters) """ # --------------------------- # Run dbRDA (global model) # --------------------------- res = dbrda( dis=dis, meta=meta, by=by, condition=condition, interactions=interactions, pcoa_fn=pcoa_fn, per_var_perm=False, # keep dbRDA fast; marginal perms are run below drop_first=drop_first, perm_n=perm_n, perm_seed=perm_seed ) # Sample size and df_model (approx.) n = dis.shape[0] if isinstance(dis, pd.DataFrame) else len(dis) try: df_model = int(np.linalg.matrix_rank(res["biplot_scores"].values)) except Exception: df_model = int(res.get("biplot_scores", pd.DataFrame()).shape[1]) # Global fit metrics total_inertia = float(res["total_inertia"]) constr_inertia = float(res["constrained_inertia"]) unconstr_inertia = float(res["unconstrained_inertia"]) R2 = constr_inertia / total_inertia if total_inertia > 0 else 0.0 # Adjusted R² denom = (n - df_model - 1) if denom > 0: adjR2 = 1.0 - (1.0 - R2) * ((n - 1.0) / denom) else: adjR2 = np.nan if isinstance(adjR2, float) and adjR2 < 0 and adjR2 > -1e-12: adjR2 = 0.0 # Aggregate %-Explained by original factors (full model) vc = res["variable_contributions"].copy() if not {"Predictor", "pct_explained"}.issubset(vc.columns): raise ValueError( "`dbrda` result missing expected 'variable_contributions' columns." ) # Use only selected columns for factor grouping if by is None: meta_use = meta elif isinstance(by, str): meta_use = meta[[by]] else: meta_use = meta[by] vc["Predictor"] = vc["Predictor"].astype(str) num_cols = meta_use.select_dtypes(include=[np.number]).columns.tolist() cat_cols = [c for c in meta_use.columns if c not in num_cols] rows: List[Dict[str, Any]] = [] # Numeric predictors: aggregate by exact name for col in num_cols: mask = (vc["Predictor"] == col) if mask.any(): rows.append({ "Factor": col, "pct_explained (full model)": float(vc.loc[mask, "pct_explained"].sum()) }) # Categorical predictors: aggregate across one-hot columns (prefix match) for col in cat_cols: mask = vc["Predictor"].str.startswith(col + "_") if mask.any(): rows.append({ "Factor": col, "pct_explained (full model)": float(vc.loc[mask, "pct_explained"].sum()) }) # Optional: interactions aggregation (if naming convention matches) if interactions: for a in interactions: # if interactions is list[str], treat each as name used in design mask = vc["Predictor"].str.contains(a) if mask.any(): rows.append({ "Factor": a, "pct_explained (full model)": float(vc.loc[mask, "pct_explained"].sum()) }) agg_full = pd.DataFrame(rows) if agg_full.empty: # Fallback: group by a simple "root" of predictor name def _root(p: str) -> str: for sep in ["×", ":", "_"]: if sep in p: return p.split(sep)[0] return p tmp = vc.groupby(vc["Predictor"].map(_root))["pct_explained"].sum().reset_index() tmp.columns = ["Factor", "pct_explained (full model)"] agg_full = tmp # Marginal tests (per factor) mt = marginal_factor_tests_dbrda( dis=dis, meta=meta, by=by, condition=condition, interactions=interactions, pcoa_fn=pcoa_fn, perm_n=perm_n, perm_seed=perm_seed, drop_first=drop_first, return_F=True ) # Merge and finalize out = mt.merge(agg_full, on="Factor", how="left") if "pct_explained (full model)" not in out.columns: out["pct_explained (full model)"] = np.nan # Optional interpretation column if include_interpretation: def _interpret(row: pd.Series) -> str: marginal = row.get("pct_explained (marginal)", np.nan) alone = row.get("pct_explained (alone)", np.nan) p_marg = row.get("p-marginal", np.nan) p_alone = row.get("p-alone", np.nan) if pd.notna(p_marg) and p_marg < 0.05: return f"Significant unique effect (marginal {marginal:.2f}%)" if pd.notna(p_alone) and p_alone < 0.05: return f"Significant alone ({alone:.2f}%), overlaps with others" return "Not significant; effect likely weak or redundant" out["Interpretation"] = out.apply(_interpret, axis=1) # Column order keep_cols = [ "Factor", "pct_explained (full model)", "df_added", "delta_inertia", "pct_explained (marginal)", "F", "p-marginal", ] if include_alone: keep_cols += ["inertia_alone", "pct_explained (alone)", "p-alone"] if include_interpretation: keep_cols += ["Interpretation"] keep_cols = [c for c in keep_cols if c in out.columns] out = out[keep_cols].copy() # Sort by significance then effect size sort_keys = [k for k in ["p-marginal", "pct_explained (marginal)"] if k in out.columns] sort_asc = [True, False][:len(sort_keys)] if sort_keys: out = out.sort_values(sort_keys, ascending=sort_asc).reset_index(drop=True) # Attach attributes out.attrs["R²"] = round(float(R2), 4) out.attrs["Adjusted R²"] = None if pd.isna(adjR2) else round(float(adjR2), 4) out.attrs["F_global"] = float(res.get("F_global", np.nan)) out.attrs["p_global"] = float(res.get("p_global", np.nan)) out.attrs["Total inertia"] = round(total_inertia, 6) out.attrs["Constrained inertia"] = round(constr_inertia, 6) out.attrs["Unconstrained inertia"] = round(unconstr_inertia, 6) out.attrs["n"] = int(n) out.attrs["df_model"] = int(df_model) return out