Source code for qdiv.diversity.alpha_div

import pandas as pd
import numpy as np
import math
from typing import Union, Any, Dict
from ..utils import rao, get_df, subset_tree_df, compute_Tmean, ra_to_branches

# -----------------------------------------------------------------------------
# Naive alpha diversity
# -----------------------------------------------------------------------------
[docs] def naive_alpha( tab: Union[pd.DataFrame, Dict[str, Any], Any], *, q: float = 1, use_values_in_tab: bool = False ) -> Union[pd.Series, float]: """ Compute naive alpha diversity of order *q* for all samples. Accepts: - DataFrame: features x samples - MicrobiomeData-like object: must expose a DataFrame in .tab / .table / .counts / .abundance - dict-of-dicts: either {feature: {sample: count}} or {sample: {feature: count}} Parameters ---------- tab : DataFrame | MicrobiomeData-like | dict Abundance table (features x samples) or convertible structure. q : float, default=1 Diversity order: - q = 0 : species richness - q = 1 : exponential of Shannon entropy - q = 2 : inverse Simpson - general q : Hill number of order q use_values_in_tab : bool, default=False If False (default), values are converted to relative abundances. If True, values in `tab` are assumed to already be relative abundances. Returns ------- pandas.Series or float Hill numbers for each sample. If input has one sample/column, returns a float. Notes ----- - For q = 1, the limit definition is used: H₁ = exp( - Σ pᵢ ln pᵢ ) - For q ≠ 1: H_q = ( Σ pᵢ^q )^( 1 / (1 - q) ) - Zero abundances are ignored safely. """ # --- Get DataFrame (features x samples) ----------------------------- tab = get_df(tab, "tab") # Ensure numeric try: tab = tab.astype(float) except Exception as e: raise TypeError( "Abundance table contains non-numeric values. Ensure counts/abundances are numeric." ) from e # --- Relative abundances -------------------------------------------------- if use_values_in_tab: ra = tab else: col_sums = tab.sum(axis=0) if (col_sums == 0).any(): bad = col_sums.index[col_sums == 0].tolist() raise ValueError(f"One or more samples have zero total abundance: {bad}") ra = tab.div(col_sums, axis=1) if q == 1: H = -(ra.where(ra > 0) * np.log(ra.where(ra > 0))).sum() return np.exp(H) else: rapow = ra.copy() rapow[ra > 0] = ra[ra > 0].pow(q) rapow = rapow.sum() Hillvalues = rapow Hillvalues[Hillvalues > 0] = Hillvalues[Hillvalues > 0].pow(1.0 / (1.0 - q)) return Hillvalues
# ----------------------------------------------------------------------------- # Phylogenetic alpha diversity # -----------------------------------------------------------------------------
[docs] def phyl_alpha( obj: Union[Dict[str, Any], Any], *, q: float = 1, index: str = "D", use_values_in_tab: bool = False ) -> Union[pd.Series, float]: """ Compute phylogenetic alpha diversity based on Hill numbers. This function implements the abundance-weighted phylogenetic diversity framework of Chao et al. (2010, Phil. Trans. R. Soc. B). Diversity is computed at the level of tree branches, where each branch is weighted by its length and by the total relative abundance of all descendant features. The primary quantity returned is the *mean phylogenetic diversity* D̄_q(T), which is a true Hill number (dimensionless, continuous, and monotone in q). A branch-length–scaled quantity (phylogenetic diversity, PD_q) can optionally be returned as a derived measure. Parameters ---------- obj : MicrobiomeData-like | dict Object containing an abundance table (`tab`) and a tree dataframe (`tree`). The tree dataframe must include: - 'leaves' : list of descendant leaves for each branch - 'branchL' : branch length q : float, default=1 Diversity order: - q = 0 : presence/absence weighting (Faith’s PD when index='PD') - q = 1 : exponential phylogenetic Shannon diversity - q = 2 : phylogenetic inverse Simpson diversity - general q : phylogenetic Hill number index : {'D', 'PD', 'H'}, default='D' Quantity to return: - 'D' : mean phylogenetic diversity D̄_q(T) (dimensionless; Hill number) - 'PD' : branch diversity PD_q(T) = T · D̄_q(T) - 'H' : entropy-like intermediate quantity: * q = 1 : phylogenetic entropy divided by T * q ≠ 1 : power-sum moment Σ_b (L_b/T) a_b^q use_values_in_tab : bool, default=False If False, abundances are converted to relative abundances per sample. If True, the abundance table is assumed to already contain relative abundances. Returns ------- pandas.Series A vector of diversity values, one per sample. Notes ----- For each sample j, the mean tree height is computed as: T_j = Σ_b L_b · a_{b,j} Mean phylogenetic diversity is defined as: D̄_q(T) = ( Σ_b (L_b / T_j) · a_{b,j}^q )^(1 / (1 − q)), q ≠ 1 D̄_1(T) = exp( − Σ_b (L_b / T_j) · a_{b,j} · log a_{b,j} ) where a_{b,j} is the total relative abundance descending from branch b. The branch diversity PD_q(T) = T_j · D̄_q(T) has units of branch length (or evolutionary time) and represents effective evolutionary work. Unlike D̄_q(T), PD_q(T) is not a Hill number for q ≠ 0, 1 and is not guaranteed to be monotone in q. """ # Get input tab = get_df(obj, "tab") tree = get_df(obj, "tree") if "leaves" not in tree.columns or "branchL" not in tree.columns: raise ValueError("`tree` must contain columns 'leaves' and 'branchL'.") # Ensure numeric try: tab = tab.astype(float) except Exception as e: raise TypeError( "Abundance table contains non-numeric values. Ensure counts/abundances are numeric." ) from e # --- Relative abundances -------------------------------------------------- if use_values_in_tab: ra = tab else: col_sums = tab.sum(axis=0) if (col_sums == 0).any(): bad = col_sums.index[col_sums == 0].tolist() raise ValueError(f"One or more samples have zero total abundance: {bad}") ra = tab.div(col_sums, axis=1) #Subset tree to features in tab tree = subset_tree_df(tree, ra.index.tolist()) #Function from utils # Build branch × sample abundance matrix tree2 = ra_to_branches(ra, tree) #Function from utils # Get Tmean Tmean = compute_Tmean(tree, tree2) #Function from utils if abs(q - 1.0) < 1e-6: mask = tree2 > 0 logp = pd.DataFrame(0.0, index=tree2.index, columns=tree2.columns) logp[mask] = np.log(tree2[mask]) # - sum_b (L_b * a_{b,j} * log a_{b,j}) / T_j term = ((tree2 * logp).mul(tree["branchL"], axis=0).div(Tmean, axis=1).sum(axis=0)) D = np.exp(-term) else: tree_calc = tree2.copy() # a_{b,j}^q tree_calc[tree_calc > 0] = tree_calc[tree_calc > 0].pow(q) # L_b * a_{b,j}^q tree_calc = tree_calc.mul(tree["branchL"], axis=0) # (L_b * a_{b,j}^q) / T_j tree_calc = tree_calc.div(Tmean, axis=1) # sum_b L_b a_{b,j}^q / T_j term = tree_calc.sum(axis=0) # D̄_q(T) = [term]^(1/(1−q)) D = term.copy() D[D > 0] = D[D > 0].pow(1.0 / (1.0 - q)) # Return requested index if index == "PD": return D * Tmean elif index == "D": return D elif index == "H": return term # entropy at q=1, moment at q≠1 else: raise ValueError("`index` must be one of: 'PD', 'D', 'H'.")
# ----------------------------------------------------------------------------- # Functional alpha diversity # -----------------------------------------------------------------------------
[docs] def func_alpha( tab: Union[pd.DataFrame, Dict[str, Any], Any], distmat: pd.DataFrame, *, q: float = 1, index: str = "FD", use_values_in_tab: bool = False ) -> Union[pd.Series, float]: """ Compute functional alpha diversity (Hill numbers) of order *q*. Implements the framework of Chiu et al. (2014, PLoS ONE), where functional diversity is derived from pairwise trait distances and species abundances. For each sample, functional diversity is computed from: Q = Σᵢ Σⱼ pᵢ pⱼ dᵢⱼ (Rao's quadratic entropy) and the functional Hill number of order q: q = 1: FD₁ = exp( -½ Σᵢ Σⱼ (pᵢ pⱼ ln(pᵢ pⱼ)) dᵢⱼ / Q ) q ≠ 1: FD_q = ( Σᵢ Σⱼ (pᵢ pⱼ)ᵠ dᵢⱼ / Q )^( 1 / (2(1−q)) ) Parameters ---------- tab : DataFrame | MicrobiomeData-like | dict Abundance table (features x samples) or convertible structure. distmat : pandas.DataFrame Functional distance matrix (features × features). q : float, default=1 Diversity order. index : {'FD', 'D', 'MD'}, default='FD' Output type: - 'D' : functional Hill number - 'MD' : mean functional diversity (D × Q) - 'FD' : functional diversity (D × MD) use_values_in_tab : bool, default=False If False, convert abundances to relative abundances. If True, assume `tab` already contains relative abundances. Returns ------- pandas.Series Functional diversity values for each sample. Notes ----- - Uses Rao's Q as implemented in your `rao()` function. - Zero abundances are handled safely. """ # Get input tab = get_df(tab, "tab") if not isinstance(distmat, pd.DataFrame): raise TypeError("`distmat` must be a pandas DataFrame.") # Ensure numeric try: tab = tab.astype(float) except Exception as e: raise TypeError( "Abundance table contains non-numeric values. Ensure counts/abundances are numeric." ) from e # --- Relative abundances -------------------------------------------------- if use_values_in_tab: ra = tab else: col_sums = tab.sum(axis=0) if (col_sums == 0).any(): bad = col_sums.index[col_sums == 0].tolist() raise ValueError(f"One or more samples have zero total abundance: {bad}") ra = tab.div(col_sums, axis=1) if isinstance(ra, pd.Series): raise ValueError("`tab` must be a DataFrame, not a Series.") # Align distance matrix to ASVs asvs = ra.index.tolist() distmat = distmat.loc[asvs, asvs] # Output container out = pd.Series(0.0, index=ra.columns) svlist = ra.index.tolist() distmat = distmat.loc[svlist, svlist] Qframe = rao(ra, distmat) if q == 1: for smp in ra.columns: ra2mat = pd.DataFrame(np.outer(ra[smp].to_numpy(), ra[smp].to_numpy()), index=ra.index, columns=ra.index) ra2Lnmat = ra2mat.copy() mask = ra2Lnmat > 0 ra2Lnmat[mask] = ra2Lnmat[mask].map(math.log) ra2ochLn = ra2mat.mul(ra2Lnmat) dQmat = distmat.mul(1 / Qframe.loc[smp]) dQ_ra2_Ln = dQmat.mul(ra2ochLn) Chiuvalue = math.exp(-0.5 * sum(dQ_ra2_Ln.sum())) out.loc[smp] = Chiuvalue else: for smp in ra.columns: ra2mat = pd.DataFrame(np.outer(ra[smp].to_numpy(), ra[smp].to_numpy()), index=ra.index, columns=ra.index) mask = ra2mat > 0 ra2mat[mask] = ra2mat[mask].pow(q) dQmat = distmat.mul(1 / Qframe.loc[smp]) ra2dq = (ra2mat.mul(dQmat)) Chiuvalue = pow(sum(ra2dq.sum()), 1 / (2 * (1 - q))) out.loc[smp] = Chiuvalue if index == 'D': return out elif index == 'MD': MD = out.mul(Qframe) return MD elif index == 'FD': MD = out.mul(Qframe) return out.mul(MD) else: raise ValueError("`index` must be one of: 'D', 'MD', 'FD'.")
# ----------------------------------------------------------------------------- # Phylo indices # -----------------------------------------------------------------------------
[docs] def mpdq( obj: Union[Dict[str, Any], Any], distmat: pd.DataFrame, *, q: float = 1.0, ) -> pd.DataFrame: """ Mean phylogenetic distance (MPD) with q-weighting of relative abundances. Accepts either a MicrobiomeData object or a dict with at least a 'tab' DataFrame. Parameters ---------- obj : MicrobiomeData, dict, or compatible object Input data. Must provide at least an abundance table ('tab'). distmat : pd.DataFrame Square distance matrix indexed/columned by feature ids. q : float, default=1.0 Order of diversity weighting applied to relative abundances. Returns ------- pandas.DataFrame References ---------- Webb et al. (2002) *American Naturalist*. """ from ..model import nriq return nriq(obj, distmat, q=q, iterations=0)
[docs] def mntdq( obj: Union[Dict[str, Any], Any], distmat: pd.DataFrame, *, q: float = 1.0, ) -> pd.DataFrame: """ Mean nearest taxon distance (MNTD) with q-weighting of relative abundances. Parameters ---------- obj : MicrobiomeData, dict, or compatible object Input data. Must provide at least an abundance table ('tab'). distmat : pd.DataFrame Square distance matrix indexed/columned by feature ids. q : float, default=1.0 Order of diversity weighting applied to relative abundances. Returns ------- pandas.DataFrame """ from ..model import ntiq return ntiq(obj, distmat, q=q, iterations=0)