import pandas as pd
import numpy as np
import math
from typing import Optional, Dict, Any, Union
from ..io import subset_samples
from ..utils import rao, beta2dist, get_df
from ..utils import subset_tree_df, ra_to_branches, compute_Tmean
from .alpha_div import naive_alpha, phyl_alpha, func_alpha
def _get_tqdm(use_tqdm: bool):
"""
Internal helper that returns tqdm if available and requested; otherwise provides
a minimal stub compatible with tqdm's API.
"""
if use_tqdm:
try:
from tqdm import tqdm
return tqdm
except Exception:
pass
class _DummyTqdm: # fallback with same constructor signature
def __init__(self, iterable=None, total=None, desc=None, unit=None, leave=False):
self._iterable = iterable if iterable is not None else range(total or 0)
def __iter__(self):
return iter(self._iterable)
def update(self, *_args, **_kwargs):
return None
def close(self):
return None
return _DummyTqdm
# -----------------------------------------------------------------------------
# Naive beta diversity
# -----------------------------------------------------------------------------
[docs]
def naive_beta(
tab: Union[pd.DataFrame, Dict[str, Any], Any],
*,
q: float = 1,
dis: bool = True,
viewpoint: str = "regional",
use_values_in_tab: bool = False
) -> pd.DataFrame:
"""
Compute naive (taxonomic) pairwise beta diversity of order *q*.
Implements the two‑community Hill‑number beta diversity framework
described in Chao et al. (2014), using only species abundances
(no phylogenetic or functional information).
For two samples A and B:
α_q = Hill number of the average of A and B
γ_q = Hill number of the pooled community
β_q = γ_q / α_q
Special case q = 1 uses the Shannon limit:
α₁ = exp( -½ Σ pᵢ ln pᵢ - ½ Σ qᵢ ln qᵢ )
γ₁ = exp( -Σ mᵢ ln mᵢ )
Parameters
----------
tab : DataFrame | MicrobiomeData-like | dict
Abundance table (features x samples) or convertible structure.
q : float, default=1
Diversity order.
dis : bool, default=True
If True, convert β to a dissimilarity using `beta2dist`.
If False, return raw β values.
viewpoint : {'local', 'regional'}, default='regional'
Viewpoint for converting β to dissimilarity.
use_values_in_tab : bool, default=False
If False, convert abundances to relative abundances.
If True, assume `tab` already contains relative abundances.
Returns
-------
pandas.DataFrame
Pairwise β-diversity (or dissimilarity) matrix.
Notes
-----
- Requires `beta2dist()` to be defined elsewhere.
- Only works for ≥ 2 samples.
"""
# Validate input
tab = get_df(tab, "tab")
# Ensure numeric
try:
tab = tab.astype(float)
except Exception as e:
raise TypeError(
"Abundance table contains non-numeric values. Ensure counts/abundances are numeric."
) from e
# --- Relative abundances --------------------------------------------------
if use_values_in_tab:
ra = tab
else:
col_sums = tab.sum(axis=0)
if (col_sums == 0).any():
bad = col_sums.index[col_sums == 0].tolist()
raise ValueError(f"One or more samples have zero total abundance: {bad}")
ra = tab.div(col_sums, axis=1)
if isinstance(ra, pd.Series) or ra.shape[1] < 2:
raise ValueError("`tab` must contain ≥ 2 samples (columns).")
# Check for duplicate sample names
smplist = ra.columns.tolist()
if len(smplist) != len(set(smplist)):
raise ValueError("Duplicate sample names detected in `tab`.")
# Output matrix
out = pd.DataFrame(0.0, index=smplist, columns=smplist)
# Pairwise beta diversity
for i in range(len(smplist) - 1):
s1 = smplist[i]
p1 = ra[s1]
for j in range(i + 1, len(smplist)):
s2 = smplist[j]
p2 = ra[s2]
# q = 1 (Shannon case)
if q == 1:
# α-diversity
mask1 = p1 > 0
H1 = (p1[mask1] * np.log(p1[mask1])).sum()
mask2 = p2 > 0
H2 = (p2[mask2] * np.log(p2[mask2])).sum()
alpha = math.exp(-0.5 * H1 - 0.5 * H2)
# γ-diversity
m = (p1 + p2) / 2
maskg = m > 0
Hg = (m[maskg] * np.log(m[maskg])).sum()
gamma = math.exp(-Hg)
beta = gamma / alpha
# q ≠ 1 (General Hill case)
else:
# α-diversity
p1q = p1[p1 > 0].pow(q).sum()
p2q = p2[p2 > 0].pow(q).sum()
alpha = (0.5 * p1q + 0.5 * p2q) ** (1.0 / (1.0 - q))
# γ-diversity
m = (p1 + p2) / 2
mq = m[m > 0].pow(q).sum()
gamma = mq ** (1.0 / (1.0 - q))
beta = gamma / alpha
out.loc[s1, s2] = beta
out.loc[s2, s1] = beta
# Convert β to dissimilarity if requested
if dis:
return beta2dist(beta=out, q=q, N=2, div_type="naive", viewpoint=viewpoint)
return out
# -----------------------------------------------------------------------------
# Phylogenetic beta diversity
# -----------------------------------------------------------------------------
[docs]
def phyl_beta(
obj: Union[Dict[str, Any], Any],
*,
q: float = 1,
dis: bool = True,
viewpoint: str = "regional",
use_values_in_tab: bool = False
) -> pd.DataFrame:
"""
Compute phylogenetic pairwise beta diversity of order *q*.
Implements the two‑community phylogenetic Hill‑number beta framework
described in Chao et al. (2014), where branch lengths are weighted by
the relative abundances of all features descending from each branch.
For two samples A and B:
α_q = phylogenetic Hill number of the average of A and B
γ_q = phylogenetic Hill number of the pooled community
β_q = γ_q / α_q
Special case q = 1 uses the Shannon limit.
Parameters
----------
obj : MicrobiomeData-like | dict
Must provide:
- 'tab': feature × sample abundance DataFrame
- 'tree': branch × columns DataFrame with:
* 'leaves' : iterable/list of leaf IDs under each branch
* 'branchL': branch length (float)
q : float, default=1
Diversity order.
dis : bool, default=True
If True, convert β to a dissimilarity using `beta2dist`.
viewpoint : {'local', 'regional'}, default='regional'
Viewpoint for converting β to dissimilarity.
use_values_in_tab : bool, default=False
If False, convert abundances to relative abundances.
If True, assume `tab` already contains relative abundances.
Returns
-------
pandas.DataFrame
Pairwise phylogenetic β-diversity (or dissimilarity) matrix.
Notes
-----
- Requires `beta2dist()` to be defined elsewhere.
- Only works for ≥ 2 samples.
"""
tab = get_df(obj, "tab")
tree = get_df(obj, "tree")
if "leaves" not in tree.columns or "branchL" not in tree.columns:
raise ValueError("`tree` must contain columns 'leaves' and 'branchL'.")
# Ensure numeric
try:
tab = tab.astype(float)
except Exception as e:
raise TypeError("Abundance table contains non-numeric values.") from e
# --- Relative abundances --------------------------------------------------
if use_values_in_tab:
ra = tab
else:
col_sums = tab.sum(axis=0)
if (col_sums == 0).any():
bad = col_sums.index[col_sums == 0].tolist()
raise ValueError(f"One or more samples have zero total abundance: {bad}")
ra = tab.div(col_sums, axis=1)
if isinstance(ra, pd.Series) or ra.shape[1] < 2:
raise ValueError("`tab` must contain ≥ 2 samples (columns).")
#Subset tree to features in tab
tree = subset_tree_df(tree, ra.index.tolist())
tree2 = ra_to_branches(ra, tree)
# Align branch lengths to tree2 index
branchL = tree["branchL"].reindex(tree2.index)
if branchL.isna().any():
missing = branchL.index[branchL.isna()].tolist()
raise ValueError(f"'branchL' missing for branches: {missing}")
# --- Pairwise phylogenetic beta diversity --------------------------------
smplist = ra.columns.tolist()
out = pd.DataFrame(0.0, index=smplist, columns=smplist)
for i in range(len(smplist) - 1):
s1 = smplist[i]
for j in range(i + 1, len(smplist)):
s2 = smplist[j]
# Subtree abundances per branch
sub = tree2[[s1, s2]].copy()
pooled = str(s1)+str(s2)
sub[pooled] = sub.mean(axis=1)
Tmean = compute_Tmean(tree, sub)
Tgamma = Tmean[pooled]
# --- γ-diversity ---------------------------------------------------
g = sub[pooled]
if abs(q - 1.0) < 1e-6:
mask = g > 0
term = g.where(mask, 0.0) * np.log(g.where(mask, 1.0))
term = (term * (branchL / Tgamma)).sum()
gamma_div = math.exp(-term)
elif q == 0:
occupied_gamma = (g > 0).astype(float)
gamma_div = (occupied_gamma.mul(branchL)).sum() / Tgamma
else:
term = (branchL * g.clip(lower=0).pow(q)).sum() / Tgamma
gamma_div = term ** (1.0 / (1.0 - q))
# --- α-diversity ---------------------------------------------------
a1 = sub[s1]
a2 = sub[s2]
if abs(q - 1.0) < 1e-6:
# Shannon limit
mask1 = a1 > 0
mask2 = a2 > 0
H = -(
(a1[mask1] * np.log(a1[mask1]) + a2[mask2] * np.log(a2[mask2]))
.mul(branchL, axis=0)
.sum()
/ (2.0 * Tgamma)
)
alpha_div = math.exp(H)
elif q == 0:
pos_counts = ((a1 > 0).astype(float) + (a2 > 0).astype(float))
alpha_div = (branchL * pos_counts).sum() / (2.0 * Tgamma)
else:
term = (
branchL *
((a1.clip(lower=0).pow(q) + a2.clip(lower=0).pow(q)) / 2.0)
).sum() / Tgamma
alpha_div = term ** (1.0 / (1.0 - q))
# β-diversity
beta_val = gamma_div / alpha_div
out.loc[s1, s2] = beta_val
out.loc[s2, s1] = beta_val
# --- Convert β to dissimilarity if requested ------------------------------
if dis:
return beta2dist(beta=out, q=q, N=2, div_type="phyl", viewpoint=viewpoint)
return out
# -----------------------------------------------------------------------------
# Functional beta diversity
# -----------------------------------------------------------------------------
[docs]
def func_beta(
tab: Union[pd.DataFrame, Dict[str, Any], Any],
distmat: pd.DataFrame,
*,
q: float = 1,
dis: bool = True,
viewpoint: str = "regional",
use_values_in_tab: bool = False,
use_tqdm: bool = True,
) -> pd.DataFrame:
"""
Compute functional pairwise beta diversity of order *q*.
Implements the two‑community functional Hill‑number beta framework
based on local functional overlaps as in Chao et al. (2014). Functional
diversity is derived from pairwise trait distances between ASVs and
their abundances.
For each pair of samples (A, B), the method computes:
- Dg : functional Hill number for the pooled community (gamma)
- Da : functional Hill number for the "average" community (alpha)
- beta = Dg / Da
For q = 1, the Shannon-type limit is used; for q ≠ 1, the general
Hill-number form is used.
Parameters
----------
tab : DataFrame | MicrobiomeData-like | dict
Abundance table (features x samples) or convertible structure.
distmat : pandas.DataFrame
Functional distance matrix (ASVs × ASVs), symmetric and
indexed by the same ASVs as `tab`.
q : float, default=1
Diversity order.
dis : bool, default=True
If True, convert β to a dissimilarity using `beta2dist`.
viewpoint : {'local', 'regional'}, default='regional'
Viewpoint for converting β to dissimilarity.
use_values_in_tab : bool, default=False
If False, convert abundances to relative abundances.
If True, assume `tab` already contains relative abundances.
use_tqdm : bool, default=True
Use `tqdm` for progress bars.
Returns
-------
pandas.DataFrame
Pairwise functional dissimilarity matrix (if `dis=True`) or
squared functional beta (β²) matrix (if `dis=False`).
Notes
-----
- Only works for ≥ 2 samples.
"""
# Get input
tab = get_df(tab, "tab")
if not isinstance(distmat, pd.DataFrame):
raise TypeError("`distmat` must be a pandas DataFrame.")
# Ensure numeric
try:
tab = tab.astype(float)
except Exception as e:
raise TypeError(
"Abundance table contains non-numeric values. Ensure counts/abundances are numeric."
) from e
# --- Relative abundances --------------------------------------------------
if use_values_in_tab:
ra = tab
else:
col_sums = tab.sum(axis=0)
if (col_sums == 0).any():
bad = col_sums.index[col_sums == 0].tolist()
raise ValueError(f"One or more samples have zero total abundance: {bad}")
ra = tab.div(col_sums, axis=1)
if isinstance(ra, pd.Series) or ra.shape[1] < 2:
raise ValueError("`tab` must contain ≥ 2 samples (columns).")
# Align distance matrix to features
asvs = ra.index.tolist()
distmat = distmat.loc[asvs, asvs]
smplist = list(ra.columns)
outD = pd.DataFrame(0.0, index=smplist, columns=smplist)
# Pairwise functional beta diversity
tqdm = _get_tqdm(use_tqdm)
for i in tqdm(range(len(smplist) - 1), desc="func_beta", unit="sample"):
for j in range(i + 1, len(smplist)):
s1 = smplist[i]
s2 = smplist[j]
# Subset abundances for the two samples
ra12 = ra[[s1, s2]].copy()
ra12["mean"] = ra12.mean(axis=1)
# Rao's Q for each column and for the mean
Qvals = rao(ra12, distmat)
Q_pooled = Qvals["mean"]
dqmat = distmat * (1.0 / Q_pooled)
# -------------------------
# Gamma component (Dg)
# -------------------------
mask_g = ra12["mean"] > 0
p_mean = ra12.loc[mask_g, "mean"].to_numpy()
outer_mean = np.outer(p_mean, p_mean)
if q == 1:
# Shannon-type functional gamma
log_outer = np.log(outer_mean)
term = outer_mean * log_outer
# dqmat restricted to nonzero rows/cols
d_sub = dqmat.loc[mask_g, mask_g].to_numpy()
Dg = math.exp(-0.5 * np.sum(term * d_sub))
else:
outer_q = outer_mean ** q
d_sub = dqmat.loc[mask_g, mask_g].to_numpy()
val = np.sum(outer_q * d_sub)
Dg = val ** (1.0 / (2.0 * (1.0 - q)))
# -------------------------
# Alpha component (Da)
# -------------------------
# A: p1 × p1
mask1 = ra12[s1] > 0
p1 = ra12.loc[mask1, s1].to_numpy()
outer11 = np.outer(p1, p1) / 4.0
d11 = dqmat.loc[mask1, mask1].to_numpy()
# B: p2 × p2
mask2 = ra12[s2] > 0
p2 = ra12.loc[mask2, s2].to_numpy()
outer22 = np.outer(p2, p2) / 4.0
d22 = dqmat.loc[mask2, mask2].to_numpy()
# C: p1 × p2
# note: indices differ; use full submatrix
outer12 = np.outer(p1, p2) / 4.0
d12 = dqmat.loc[mask1, mask2].to_numpy()
if q == 1:
# Shannon-type functional alpha
# A
log11 = np.log(outer11)
term11 = outer11 * log11 * d11
asum1 = term11.sum()
# B
log22 = np.log(outer22)
term22 = outer22 * log22 * d22
asum2 = term22.sum()
# C
log12 = np.log(outer12)
term12 = outer12 * log12 * d12
asum12 = term12.sum()
Da = 0.5 * math.exp(-0.5 * (asum1 + asum2 + 2.0 * asum12))
else:
# General q alpha
a11_q = outer11 ** q
asum1 = np.sum(a11_q * d11)
a22_q = outer22 ** q
asum2 = np.sum(a22_q * d22)
a12_q = outer12 ** q
asum12 = np.sum(a12_q * d12)
Da = 0.5 * (asum1 + asum2 + 2.0 * asum12) ** (1.0 / (2.0 * (1.0 - q)))
# -------------------------
# Beta component
# -------------------------
beta_val = Dg / Da
outD.loc[s1, s2] = beta_val
outD.loc[s2, s1] = beta_val
# Square β to get FD-like measure
outFD = outD.pow(2)
# Convert β to dissimilarity if requested
if dis:
return beta2dist(beta=outFD, q=q, N=2, div_type="func", viewpoint=viewpoint)
return outFD
# -----------------------------------------------------------------------------
# Bray-Curtis
# -----------------------------------------------------------------------------
[docs]
def bray(
tab: Union[pd.DataFrame, Dict[str, Any], Any],
*,
use_values_in_tab: bool = False
) -> pd.DataFrame:
"""
Compute the Bray–Curtis dissimilarity matrix between all samples.
Bray–Curtis dissimilarity between two samples A and B is:
BC(A, B) = 1 − Σ_i min(p_iA, p_iB)
where p_iA and p_iB are relative abundances of feature i in samples A and B.
Parameters
----------
tab : DataFrame | MicrobiomeData-like | dict
Abundance table (features x samples) or convertible structure.
use_values_in_tab : bool, default=False
If False, convert abundances to relative abundances.
If True, assume `tab` already contains relative abundances.
Returns
-------
pandas.DataFrame
Symmetric Bray–Curtis dissimilarity matrix.
Notes
-----
- Requires at least two samples.
- Zero-sum samples are not allowed unless `use_values_in_tab=True`.
"""
# --- Validate input ------------------------------------------------------
tab = get_df(tab, "tab")
if tab.shape[1] < 2:
raise ValueError("`tab` must contain at least two samples (columns).")
# Ensure numeric
try:
tab = tab.astype(float)
except Exception as e:
raise TypeError(
"Abundance table contains non-numeric values. Ensure counts/abundances are numeric."
) from e
# --- Relative abundances --------------------------------------------------
if use_values_in_tab:
ra = tab
else:
col_sums = tab.sum(axis=0)
if (col_sums == 0).any():
bad = col_sums.index[col_sums == 0].tolist()
raise ValueError(f"One or more samples have zero total abundance: {bad}")
ra = tab.div(col_sums, axis=1)
# --- Compute Bray–Curtis -------------------------------------------------
samples = ra.columns.tolist()
out = pd.DataFrame(0.0, index=samples, columns=samples)
# Efficient pairwise computation
for i in range(len(samples) - 1):
s1 = samples[i]
p1 = ra[s1]
for j in range(i + 1, len(samples)):
s2 = samples[j]
p2 = ra[s2]
# BC = 1 − Σ min(p_iA, p_iB)
bc = 1.0 - np.minimum(p1, p2).sum()
out.loc[s1, s2] = bc
out.loc[s2, s1] = bc
return out
# -----------------------------------------------------------------------------
# Jaccard
# -----------------------------------------------------------------------------
[docs]
def jaccard(
tab: Union[pd.DataFrame, Dict[str, Any], Any],
*,
use_values_in_tab: bool = False
) -> pd.DataFrame:
"""
Compute the Jaccard dissimilarity matrix between all samples.
Jaccard dissimilarity between two samples A and B is:
J(A, B) = 1 − ( |A ∩ B| / |A ∪ B| )
where presence/absence is determined by whether abundance > 0.
Parameters
----------
tab : DataFrame | MicrobiomeData-like | dict
Abundance table (features x samples) or convertible structure.
use_values_in_tab : bool, default=False
Ignored for Jaccard (presence/absence only), included for API symmetry.
Returns
-------
pandas.DataFrame
Symmetric Jaccard dissimilarity matrix.
Notes
-----
- Requires at least two samples.
- Abundances are converted to binary presence/absence.
"""
# --- Validate input ------------------------------------------------------
tab = get_df(tab, "tab")
if tab.shape[1] < 2:
raise ValueError("`tab` must contain at least two samples (columns).")
# Ensure numeric
try:
tab = tab.astype(float)
except Exception as e:
raise TypeError(
"Abundance table contains non-numeric values. Ensure counts/abundances are numeric."
) from e
# --- Convert to presence/absence ----------------------------------------
bintab = (tab > 0).astype(int)
samples = bintab.columns.tolist()
out = pd.DataFrame(0.0, index=samples, columns=samples)
# --- Compute Jaccard dissimilarity --------------------------------------
for i in range(len(samples) - 1):
s1 = samples[i]
v1 = bintab[s1]
for j in range(i + 1, len(samples)):
s2 = samples[j]
v2 = bintab[s2]
shared = np.sum((v1 == 1) & (v2 == 1))
total = np.sum((v1 == 1) | (v2 == 1))
# Avoid division by zero (no species in either sample)
jac = 1.0 if total == 0 else 1.0 - shared / total
out.loc[s1, s2] = jac
out.loc[s2, s1] = jac
return out
# -----------------------------------------------------------------------------
# Naive beta diversity for multiple samples
# -----------------------------------------------------------------------------
[docs]
def naive_multi_beta(
obj: Union[Dict[str, Any], Any],
*,
by: Optional[str] = None,
q: float = 1,
) -> pd.DataFrame:
"""
Compute naive (taxonomic) multi‑sample beta diversity for groups of samples.
This implements the multi‑sample Hill‑number beta framework:
β_q = γ_q / ( α_q / N )
where:
- γ_q is the Hill number of the pooled community
- α_q is the mean within‑sample Hill number
- N is the number of samples in the group
Parameters
----------
obj : MicrobiomeData-like | dict
Must contain:
- 'meta' : pandas.DataFrame with sample metadata
- 'tab' : pandas.DataFrame with feature counts (features × samples)
by : str or None, default=None
Column in metadata defining sample groups.
If None, all samples are treated as one group.
q : float, default=1
Diversity order.
Returns
-------
pandas.DataFrame
Index = categories in `var` (or 'all' if var=None)
Columns:
- N : number of samples in group
- beta : multi‑sample beta diversity
- local_dis : local‑viewpoint dissimilarity
- regional_dis : regional‑viewpoint dissimilarity
Notes
-----
- Groups with <2 samples return NaN.
"""
# Validate input
meta = get_df(obj, "meta")
tab = get_df(obj, "tab")
if tab.shape[1] < 2:
raise ValueError("At least two samples are required.")
# Build dictionary of subtables by category
if by is None:
categories = ["all"]
tabdict = {"all": tab.copy()}
else:
if by not in meta.columns:
raise ValueError(f"Column '{by}' not found in metadata.")
categories = meta[by].unique().tolist()
tabdict = {
cat: get_df(subset_samples(obj, by=by, values=[cat]), "tab")
for cat in categories
}
# Output container
out = pd.DataFrame(
np.nan,
index=categories,
columns=["N", "beta", "local_dis", "regional_dis"]
)
# Compute multi‑sample beta for each category
for cat in categories:
subtab = tabdict[cat]
# Need at least 2 samples
if subtab.shape[1] < 2:
continue
N = subtab.shape[1]
out.loc[cat, "N"] = N
# Convert to relative abundances if needed
col_sums = subtab.sum()
if (col_sums == 0).any():
raise ValueError(f"Group '{cat}' contains a zero‑sum sample.")
ra = subtab.div(col_sums)
# Build alpha/gamma table for naive_alpha()
# gamma row = pooled abundances
gamma_row = ra.sum(axis=1).to_numpy()
# alpha rows = each sample's abundances
alpha_rows = ra.to_numpy().T.reshape(-1)
df_temp = pd.DataFrame({
"gamma": np.concatenate([gamma_row, np.zeros_like(alpha_rows)]),
"alpha": np.concatenate([np.zeros_like(gamma_row), alpha_rows])
})
# Compute alpha and gamma Hill numbers
divs = naive_alpha(df_temp, q=q, use_values_in_tab=False)
gamma_div = divs["gamma"]
alpha_div = divs["alpha"] / N
beta = gamma_div / alpha_div
out.loc[cat, "beta"] = beta
# Convert to dissimilarities
out.loc[cat, "local_dis"] = beta2dist(
beta, q=q, N=N, div_type="naive", viewpoint="local"
)
out.loc[cat, "regional_dis"] = beta2dist(
beta, q=q, N=N, div_type="naive", viewpoint="regional"
)
return out
# -----------------------------------------------------------------------------
# Phylogenetic beta diversity for multiple samples
# -----------------------------------------------------------------------------
[docs]
def phyl_multi_beta(
obj: Union[Dict[str, Any], Any],
*,
by: Optional[str] = None,
q: float = 1,
) -> pd.DataFrame:
"""
Compute phylogenetic multi‑sample beta diversity for groups of samples.
Implements the multi‑sample phylogenetic Hill‑number beta framework
described in Chao et al. (2014), where branch lengths are weighted by
the relative abundances of all ASVs descending from each branch.
For each group of samples:
β_q = γ_q / ( α_q / N )
where:
- γ_q is the phylogenetic Hill number of the pooled community
- α_q is the mean within‑sample phylogenetic Hill number
- N is the number of samples in the group
Parameters
----------
obj : MicrobiomeData-like | dict
Must contain:
- 'meta' : pandas.DataFrame with sample metadata
- 'tab' : pandas.DataFrame with ASV counts (ASVs × samples)
- 'tree' : pandas.DataFrame with:
* 'leaves' : list of features under each branch
* 'branchL' : branch length
by : str or None, default=None
Metadata column defining sample groups.
If None, all samples are treated as one group.
q : float, default=1
Diversity order.
Returns
-------
pandas.DataFrame
Index = categories in `by` (or 'all' if by=None)
Columns:
- N : number of samples in group
- beta : multi‑sample phylogenetic beta diversity
- local_dis : local‑viewpoint dissimilarity
- regional_dis : regional‑viewpoint dissimilarity
Notes
-----
- Only works for ≥ 2 samples per group.
"""
# Validate input
tab = get_df(obj, "tab")
meta = get_df(obj, "meta")
tree = get_df(obj, "tree")
if tab.shape[1] < 2:
raise ValueError("At least two samples are required.")
if "leaves" not in tree.columns or "branchL" not in tree.columns:
raise ValueError("`tree` must contain columns 'leaves' and 'branchL'.")
#Subset tree to features in tab
tree = subset_tree_df(tree, tab.index.tolist())
# Build dictionary of subtables by category
if by is None:
categories = ["all"]
tabdict = {"all": tab.copy()}
else:
if by not in meta.columns:
raise ValueError(f"Column '{by}' not found in metadata.")
categories = meta[by].unique().tolist()
tabdict = {
cat: get_df(subset_samples(obj, by=by, values=[cat]), "tab")
for cat in categories
}
# Output container
out = pd.DataFrame(
np.nan,
index=categories,
columns=["N", "beta", "local_dis", "regional_dis"]
)
# Compute multi‑sample phylogenetic beta for each category
for cat in categories:
subtab = tabdict[cat]
# Need at least 2 samples
if subtab.shape[1] < 2:
continue
N = subtab.shape[1]
out.loc[cat, "N"] = N
# Relative abundances
col_sums = subtab.sum()
if (col_sums == 0).any():
raise ValueError(f"Group '{cat}' contains a zero‑sum sample.")
ra = subtab.div(col_sums)
# Build branch × sample abundance matrix
tree2 = ra_to_branches(ra, tree)
# --- Compute Tavg = Σ_b L_b * mean(p_b) -------------------------------------
# Align branch lengths to tree2 (branch × sample) and ensure numeric
branchL = pd.to_numeric(tree["branchL"], errors="raise").reindex(tree2.index)
if branchL.isna().any():
missing = branchL.index[branchL.isna()].tolist()
raise ValueError(f"'branchL' missing for branches: {missing}")
mean_ra = tree2.mean(axis=1) # γ_b: mean of per-branch RA across N samples
Tavg = float(mean_ra.mul(branchL).sum()) # Σ L_b * γ_b
# --- γ-diversity -------------------------------------------------------------
g = mean_ra
if abs(q - 1.0) < 1e-6:
mask = g > 0
H = -((g[mask] * np.log(g[mask]) * branchL[mask]).sum() / Tavg)
gamma_div = math.exp(H)
elif q == 0:
gamma_div = branchL[g > 0].sum() / Tavg
else:
term = (branchL * g.clip(lower=0).pow(q)).sum() / Tavg
gamma_div = term ** (1.0 / (1.0 - q))
# --- α-diversity -------------------------------------------------------------
K = tree2.shape[1]
if abs(q - 1.0) < 1e-6:
mask = tree2 > 0
term = (
(tree2[mask] * np.log(tree2[mask]))
.mul(branchL, axis=0)
.sum().sum()
/ (K * Tavg)
)
alpha_div = math.exp(-term)
elif q == 0:
pos_counts = (tree2 > 0).sum(axis=1).astype(float)
alpha_div = (branchL * pos_counts).sum() / (K * Tavg)
else:
term = (
branchL *
(tree2.clip(lower=0).pow(q).sum(axis=1) / K)
).sum() / Tavg
alpha_div = term ** (1.0 / (1.0 - q))
# β-diversity
beta = gamma_div / alpha_div
out.loc[cat, "beta"] = beta
out.loc[cat, "local_dis"] = beta2dist(
beta=beta, q=q, N=N, div_type="phyl", viewpoint="local"
)
out.loc[cat, "regional_dis"] = beta2dist(
beta=beta, q=q, N=N, div_type="phyl", viewpoint="regional"
)
return out
# -----------------------------------------------------------------------------
# Functional beta diversity for multiple samples
# -----------------------------------------------------------------------------
[docs]
def func_multi_beta(
obj: Union[Dict[str, Any], Any],
distmat: pd.DataFrame,
*,
by: Optional[str] = None,
q: float = 1,
) -> pd.DataFrame:
"""
Compute functional multi‑sample beta diversity for groups of samples.
Implements the multi‑sample functional Hill‑number beta framework
described in Chiu et al. (2014), where functional diversity is derived
from pairwise trait distances and species abundances.
For each group of samples:
β_q = D_gamma / D_alpha
where:
- D_gamma is the functional Hill number of the pooled community
- D_alpha is the mean functional Hill number across all sample pairs
- N is the number of samples in the group
- NxN = N² (number of ordered sample pairs)
Parameters
----------
obj : MicrobiomeData-like | dict
Must contain:
- 'meta' : pandas.DataFrame with sample metadata
- 'tab' : pandas.DataFrame (features × samples)
distmat : pandas.DataFrame
Functional distance matrix (features × features).
by : str or None, default=None
Metadata column defining sample groups.
If None, all samples are treated as one group.
q : float, default=1
Diversity order.
Returns
-------
pandas.DataFrame
Index = categories in `by` (or 'all' if by=None)
Columns:
- NxN : N² (number of ordered sample pairs)
- beta : functional multi‑sample beta diversity
- local_dis : local‑viewpoint dissimilarity
- regional_dis : regional‑viewpoint dissimilarity
Notes
-----
- Only works for ≥ 2 samples per group.
"""
# Validate input
tab = get_df(obj, "tab")
meta = get_df(obj, "meta")
if tab.shape[1] < 2:
raise ValueError("At least two samples are required.")
# Make sure tab and distmat have the same index
in_common = list(set(distmat.index).intersection(tab.index))
if len(in_common) < len(tab):
raise ValueError("Features in tab are missing in distmat.")
tab = tab.loc[in_common]
distmat = distmat.loc[in_common, in_common].copy()
# Build dictionary of subtables by category
if by is None:
categories = ["all"]
tabdict = {"all": tab.copy()}
else:
if by not in meta.columns:
raise ValueError(f"Column '{by}' not found in metadata.")
categories = meta[by].unique().tolist()
tabdict = {
cat: get_df(subset_samples(obj, by=by, values=[cat], keep_absent=True), "tab")
for cat in categories
}
# Output container
out = pd.DataFrame(
np.nan,
index=categories,
columns=["NxN", "beta", "local_dis", "regional_dis"]
)
# Compute multi‑sample functional beta for each category
for cat in categories:
subtab = tabdict[cat]
# Need at least 2 samples
if subtab.shape[1] < 2:
continue
N = subtab.shape[1]
out.loc[cat, "NxN"] = N * N
# Relative abundances
col_sums = subtab.sum()
if (col_sums == 0).any():
raise ValueError(f"Group '{cat}' contains a zero‑sum sample.")
ra = subtab.div(col_sums)
smplist = ra.columns.tolist()
# Compute pooled mean abundances
ra_mean = ra.mean(axis=1)
# Rao's Q for pooled community
Q_pooled = rao(ra_mean, distmat)
dqmat = distmat * (1.0 / Q_pooled)
# γ-diversity (pooled)
p = ra_mean.to_numpy()
outer = np.outer(p, p)
if q == 1:
mask = outer > 0
log_outer = np.zeros_like(outer)
log_outer[mask] = np.log(outer[mask])
term = outer * log_outer
Dg = math.exp(-0.5 * np.sum(term * dqmat.values))
else:
mask = outer > 0
outer_q = np.zeros_like(outer)
outer_q[mask] = outer[mask] ** q
Dg = (np.sum(outer_q * dqmat.values)) ** (1.0 / (2.0 * (1.0 - q)))
# α-diversity (mean over all ordered sample pairs)
asum = 0.0
for s1 in smplist:
p1 = ra[s1].to_numpy()
for s2 in smplist:
p2 = ra[s2].to_numpy()
outer12 = np.outer(p1, p2) / (N * N)
if q == 1:
mask = outer12 > 0
log_outer = np.zeros_like(outer12)
log_outer[mask] = np.log(outer12[mask])
term = outer12 * log_outer
asum += np.sum(term * dqmat.values)
else:
mask = outer12 > 0
outer_q = np.zeros_like(outer12)
outer_q[mask] = outer12[mask] ** q
asum += np.sum(outer_q * dqmat.values)
if q == 1:
Da = (1.0 / N) * math.exp(-0.5 * asum)
else:
Da = (1.0 / N) * (asum ** (1.0 / (2.0 * (1.0 - q))))
# β-diversity
beta = Dg / Da
out.loc[cat, "beta"] = beta
out.loc[cat, "local_dis"] = beta2dist(
beta=beta, q=q, N=N, div_type="func", viewpoint="local"
)
out.loc[cat, "regional_dis"] = beta2dist(
beta=beta, q=q, N=N, div_type="func", viewpoint="regional"
)
return out
# -----------------------------------------------------------------------------
# Evenness
# -----------------------------------------------------------------------------
[docs]
def evenness(
obj: Union[pd.DataFrame, Dict[str, Any], Any],
distmat: Optional[pd.DataFrame] = None,
*,
q: float = 1,
div_type: str = "naive",
index: str = "pielou",
perspective: str = "samples",
use_values_in_tab: bool = False
) -> pd.Series:
"""
Compute evenness measures from Chao & Ricotta (2019, Ecology 100:e02852),
with optional support for Pielou’s classical evenness index.
Supports:
- naive (taxonomic) evenness
- phylogenetic evenness
- functional evenness
Supported evenness indices:
- CR1 (regional evenness)
- CR2 (local evenness)
- CR3
- CR4
- CR5
- pielou (Pielou’s J; defined only for q = 1)
Parameters
----------
obj : DataFrame | MicrobiomeData-like | dict
Including abundance table (features × samples) and optionally
tree (pandas.DataFrame, required if divType='phyl')
distmat : pandas.DataFrame, optional
Required if divType='func'. Functional distance matrix.
q : float, default=1
Diversity order.
div_type : {'naive', 'phyl', 'func'}
Type of diversity measure used to compute D.
index : {'CR1','CR2','CR3','CR4','CR5','local','regional','pielou'}
Evenness index to compute.
perspective : {'samples','taxa'}
Whether to compute evenness across samples (columns)
or across taxa/branches (rows).
use_values_in_tab : bool, default=False
If False, convert abundances to relative abundances.
Returns
-------
pandas.Series
Evenness values indexed by sample or taxon.
Notes
-----
- CR1 = regional evenness
- CR2 = local evenness
- CR3–CR5 are alternative evenness formulations from Chao & Ricotta (2019)
- Pielou’s index is included for convenience and corresponds to:
J = H' / ln(S) = ln(D₁) / ln(S)
where D₁ is the Hill number of order q = 1.
"""
# Validate inputs
tab = get_df(obj, "tab")
if perspective not in ("samples", "taxa"):
raise ValueError("perspective must be 'samples' or 'taxa'.")
if index in ("CR1", "regional"):
power = 1 - q
elif index in ("CR2", "local"):
power = q - 1
else:
power = None # used only for CR3–CR5
# Compute S (richness) and D (Hill number)
if perspective == "samples":
# Richness per sample
S_series = (tab > 0).sum(axis=0)
# Diversity per sample
if div_type == "naive":
D_series = naive_alpha(tab, q=q, use_values_in_tab=use_values_in_tab)
elif div_type == "phyl":
tree = get_df(obj, "tree")
if not isinstance(tree, pd.DataFrame):
raise ValueError("tree must be provided for div_type='phyl'.")
D_series = phyl_alpha(obj, q=q, index="D",
use_values_in_tab=use_values_in_tab)
elif div_type == "func":
if not isinstance(distmat, pd.DataFrame):
raise ValueError("distmat must be provided for div_type='func'.")
D_series = func_alpha(tab, distmat, q=q, index="D",
use_values_in_tab=use_values_in_tab)
else:
raise ValueError("divType must be 'naive', 'phyl', or 'func'.")
# Perspective = taxa
else:
if div_type == "naive":
tabT = tab.T
S_series = tabT.count().astype(float)
D_series = naive_alpha(tabT, q=q, use_values_in_tab=use_values_in_tab)
elif div_type == "phyl":
tree = get_df(obj, "tree")
if not isinstance(tree, pd.DataFrame):
raise ValueError("tree must be provided for divType='phyl'.")
# Relative abundances
ra = tab.div(tab.sum()) if not use_values_in_tab else tab.astype(float)
#Subset tree to features in tab
tree = subset_tree_df(tree, ra.index.tolist())
# Build branch × sample matrix
tree2 = ra_to_branches(ra, tree)
# Normalize across samples
tree2 = tree2.T
tree2 = tree2.div(tree2.sum()).fillna(0.0)
S_series = tree2.count().astype(float)
D_series = naive_alpha(tree2, q=q, use_values_in_tab=True)
else:
raise ValueError("divType must be 'naive' or 'phyl' when perspective='taxa'.")
# Compute evenness index
if index in ("CR1", "CR2", "regional", "local"):
if q == 1:
df = pd.DataFrame({"D": D_series, "S": S_series}).astype(float)
mask = (df["S"] > 0) & (df["D"] > 0)
logD = np.log(df.loc[mask, "D"])
logS = np.log(df.loc[mask, "S"])
measure = logD / logS
else:
Dp = D_series.astype(float).pow(power)
Sp = S_series.astype(float).pow(power)
measure = (1 - Dp) / (1 - Sp)
elif index == "CR3":
measure = (D_series - 1) / (S_series - 1)
elif index == "CR4":
measure = (1 - 1 / D_series) / (1 - 1 / S_series)
elif index == "CR5":
measure = np.log(D_series) / np.log(S_series)
elif index == "pielou":
if q != 1:
raise ValueError("Pielou's index is only defined for q = 1.")
measure = np.log(D_series) / np.log(S_series)
else:
raise ValueError("index must be one of: CR1, CR2, CR3, CR4, CR5, local, regional, pielou.")
return measure
# -----------------------------------------------------------------------------
# Dissimilarity by feature
# -----------------------------------------------------------------------------
[docs]
def dissimilarity_by_feature(
obj: Union[Dict[str, Any], Any],
*,
by: Optional[str] = None,
q: float = 1,
div_type: str = "naive",
index: str = "regional",
use_values_in_tab: bool = False
) -> pd.DataFrame:
"""
Compute the contribution of individual taxa (or phylogenetic nodes)
to the overall dissimilarity between multiple samples, following
Chao & Ricotta (2019, Ecology 100:e02852).
Supports:
- naive (taxonomic) dissimilarity
- phylogenetic dissimilarity
Parameters
----------
obj : DataFrame | MicrobiomeData-like | dict
Must contain:
- 'tab' : abundance table (features × samples)
- 'meta' : metadata table (optional if by=None)
- 'tree' : phylogenetic tree (required if divType='phyl')
by : str or None, default=None
Metadata column defining sample groups.
If None, all samples are treated as one group.
q : float, default=1
Diversity order.
div_type : {'naive','phyl'}, default='naive'
Type of dissimilarity measure.
index : {'local','regional','CR1','CR2'}, default='regional'
Evenness/dissimilarity index.
use_values_in_tab : bool, default=False
If False, convert abundances to relative abundances.
Returns
-------
pandas.DataFrame
Rows:
- 'dis' : total dissimilarity
- 'N' : number of samples in group
- one row per taxon (naive) or per node (phylogenetic)
Columns:
- one column per category in `by`
"""
# Validate input
tab = get_df(obj, "tab")
if tab.shape[1] < 2:
raise ValueError("At least two samples are required.")
if div_type not in ("naive", "phyl"):
raise ValueError("divType must be 'naive' or 'phyl'.")
if index in ("CR1", "regional"):
idx = "regional"
elif index in ("CR2", "local"):
idx = "local"
else:
raise ValueError("index must be 'local', 'regional', 'CR1', or 'CR2'.")
# Build dictionary of subtables by category
if by is None:
categories = ["all"]
tabdict = {"all": tab.copy()}
else:
meta = get_df(obj, "meta")
if by not in meta.columns:
raise ValueError(f"Column '{by}' not found in metadata.")
categories = meta[by].unique().tolist()
tabdict = {
cat: get_df(subset_samples(obj, by=by, values=[cat]), "tab")
for cat in categories
}
# Prepare output table
if div_type == "naive":
feature_index = ["dis", "N"] + tab.index.tolist()
else:
tree = get_df(obj, "tree")
tree = subset_tree_df(tree, tab.index.tolist())
feature_index = ["dis", "N"] + tree.index.tolist()
out = pd.DataFrame(np.nan, index=feature_index, columns=categories)
# Main loop over categories
for cat in categories:
subtab = tabdict[cat]
N = subtab.shape[1]
out.loc["N", cat] = N
if N < 2:
continue
# Relative abundances
if use_values_in_tab:
ra = subtab.astype(float)
else:
col_sums = subtab.sum()
if (col_sums == 0).any():
raise ValueError(f"Group '{cat}' contains a zero-sum sample.")
ra = subtab.div(col_sums)
# NAIVE VERSION
if div_type == "naive":
# Compute weights w_i
if idx == "regional": # CR1
w = subtab.sum(axis=1).pow(q)
w = w / w.sum()
else: # local = CR2
tab_q = subtab.copy()
mask = tab_q > 0
tab_q[mask] = tab_q[mask].pow(q)
w = tab_q.sum(axis=1) / tab_q.sum().sum()
# Evenness per taxon
ev = evenness(
subtab,
q=q,
div_type="naive",
index=idx,
perspective="taxa",
use_values_in_tab=use_values_in_tab,
)
# Contribution
contrib = w * (1 - ev)
out.loc["dis", cat] = contrib.sum()
out.loc[contrib.index, cat] = 100 * contrib / contrib.sum()
out[cat] = out[cat].fillna(0)
# PHYLOGENETIC VERSION
elif div_type == "phyl":
# Build branch × sample matrix
tree2 = ra_to_branches(ra, tree)
# Evenness per node (correct)
ev = evenness(
{"tab": subtab, "tree": tree},
q=q,
div_type="phyl",
index=idx,
perspective="taxa",
use_values_in_tab=use_values_in_tab,
)
# Compute branch weights
if idx == "regional": # CR1
zi = tree2.sum(axis=1)
zi_q = zi.clip(lower=0).pow(q)
w = (tree["branchL"] * zi_q)
w = w / w.sum()
else: # local = CR2
tree2_q = tree2.clip(lower=0).pow(q)
zv = tree2_q.sum(axis=1)
w = (tree["branchL"] * zv)
w = w / w.sum()
# Contribution
contrib = tree["branchL"] * w * (1 - ev)
out.loc["dis", cat] = contrib.sum()
out.loc[contrib.index, cat] = 100 * contrib / contrib.sum()
out.loc[tree.index, 'nodes'] = tree['nodes']
return out
# -----------------------------------------------------------------------------
# beta MPDq and MNTDq
# -----------------------------------------------------------------------------
[docs]
def beta_mpdq(
obj: Union[Dict[str, Any], Any],
distmat: pd.DataFrame,
*,
q: float = 1.0,
) -> pd.DataFrame:
"""
Computes beta-MPD_q for all sample pairs.
Parameters
----------
obj : MicrobiomeData, dict, or compatible object
Input data. Must provide at least an abundance table ('tab').
distmat : pd.DataFrame
Square distance matrix indexed/columned by feature ids.
q : float, default=1.0
Order of diversity weighting applied to relative abundances.
Returns
-------
pandas.DataFrame (S x S):
"""
from ..model import beta_nriq
return beta_nriq(obj, distmat, q=q, iterations=0)
[docs]
def beta_mntdq(
obj: Union[Dict[str, Any], Any],
distmat: pd.DataFrame,
*,
q: float = 1.0,
include_conspecifics: bool = False,
) -> pd.DataFrame:
"""
Computes beta-MNTD_q for all sample pairs.
Parameters
----------
obj : MicrobiomeData, dict, or compatible object
Input data. Must provide at least an abundance table ('tab').
distmat : pd.DataFrame
Square distance matrix indexed/columned by feature ids.
q : float, default=1.0
Order of diversity weighting applied to relative abundances.
include_conspecifics : bool, default=False
Determines whether conspecifics (identical features shared between samples) are allowed
to contribute zero-distance matches in the nearest-taxon calculation.
Returns
-------
pandas.DataFrame (S x S)
"""
from ..model import beta_ntiq
return beta_ntiq(obj, distmat, q=q, iterations=0, include_conspecifics=include_conspecifics)