import numpy as np
import pandas as pd
from typing import Dict, Optional, Union, Any, Literal, Tuple
from ..diversity import bray, jaccard, naive_beta, phyl_beta, func_beta
from ..utils import get_df
def _get_tqdm(use_tqdm: bool):
"""
Internal helper that returns tqdm if available and requested; otherwise provides
a minimal stub compatible with tqdm's API.
"""
if use_tqdm:
try:
from tqdm import tqdm
return tqdm
except Exception:
pass
class _DummyTqdm: # fallback with same constructor signature
def __init__(self, iterable=None, total=None, desc=None, unit=None, leave=False,
ncols=None, ascii=True, mininterval=None, position=None, miniters=None):
self._iterable = iterable if iterable is not None else range(total or 0)
def __iter__(self):
return iter(self._iterable)
def update(self, *_args, **_kwargs):
return None
def close(self):
return None
return _DummyTqdm
# ---------------------------------------------------------------------------
# RCQ: Null comparisons for beta-diversity
# ---------------------------------------------------------------------------
[docs]
def rcq(
obj: Union[Dict[str, Any], Any],
*,
constrain_by: Optional[str] = None,
randomization: Literal["frequency", "abundance"] = "frequency",
iterations: int = 999,
div_type: Literal["Jaccard", "Bray", "naive", "phyl", "func"] = "naive",
distmat: Optional[pd.DataFrame] = None,
q: float = 1.0,
use_tqdm: bool = True,
random_state: Optional[Union[int, np.random.Generator]] = None,
**kwargs,
) -> Dict[str, pd.DataFrame]:
"""
Raup–Crick-style null comparisons for beta-diversity.
Randomizes the abundance table while preserving each sample's richness and
total reads, then contrasts the observed beta-diversity matrix against a
null distribution built via randomization.
Parameters
----------
obj : MicrobiomeData | dict | Any
Input with at least an abundance table under key 'tab'. Optionally may include
'meta' (sample metadata) and 'tree' (for phylogenetic measures).
constrain_by : str, optional
Column in metadata to constrain randomization within categories; if None, randomize across all samples.
randomization : {"frequency", "abundance"}, default="frequency"
Randomization strategy for selecting the set of taxa per randomized sample:
- "abundance": probabilities proportional to group-level summed abundances
- "frequency": probabilities proportional to group-level presence frequency
Within the selected set, additional reads are allocated proportional to
the selected taxa's group-level abundances to match each sample's total reads.
iterations : int, default=999
Number of randomization iterations used to build the null distribution.
div_type : {"Jaccard", "Bray", "naive", "phyl", "func"}, default="naive"
Dissimilarity index to compute for observed and null tables.
- "Jaccard", "Bray": classic indices on the (randomized) count table
- "naive": Hill-number-based (requires q)
- "phyl": phylogenetic beta diversity (requires 'tree' in obj)
- "func": functional beta diversity (requires distmat)
distmat : pandas.DataFrame, optional
Square functional distance matrix (features × features); required if div_type="func".
q : float, default=1.0
Diversity order for Hill-number-based indices (used by "naive", "phyl", "func").
use_tqdm : bool, default=True
Use `tqdm` for progress bars.
random_state : int | numpy.random.Generator, optional
Random seed or Generator for reproducibility.
Returns
-------
dict
{
"div_type": str,
"obs_d": DataFrame (S × S), observed beta-diversity,
"p": DataFrame (S × S), Raup–Crick probability P(null < obs) + 0.5·P(null == obs),
"null_mean":DataFrame (S × S), mean of null,
"null_std": DataFrame (S × S), std of null,
"ses": DataFrame (S × S), (null_mean - obs) / null_std
}
Notes
-----
- Per-sample constraints: if `constrain_by` is given, randomization is performed within each
metadata category independently to preserve structure. Otherwise, all samples are randomized together.
- Richness & read preservation: for each sample, we draw a set of taxa matching the original
richness, then allocate extra reads to match the original total reads.
- Raup–Crick p-index: counts how often the null dissimilarity is strictly lower than observed,
ties contribute 0.5, normalized by `iterations`.
- A p value close to zero means observed dissimilarity is lower than the null expectation.
- A p value close to one means observed dissimilarity is higher than the null expectation.
- A positive ses means observed dissimilarity is lower than the null expectation.
- A negative ses means observed dissimilarity is higher than the null expectation.
"""
if "seed" in kwargs:
if random_state is not None:
raise TypeError("Specify only one of 'random_state' or 'seed'.")
random_state = kwargs.pop("seed")
if kwargs:
raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}")
# --- Extract tables & context
tab = get_df(obj, "tab")
if tab is None:
raise ValueError("'tab' is needed in input.")
tab = tab.copy()
if tab.empty:
raise ValueError("'tab' must be a non-empty DataFrame.")
meta = None
tree = None
if hasattr(obj, "meta") or (isinstance(obj, dict) and "meta" in obj):
try:
meta = get_df(obj, "meta")
except Exception:
meta = None
if div_type == "phyl":
tree = get_df(obj, "tree")
if div_type == "func":
if not isinstance(distmat, pd.DataFrame):
raise ValueError("div_type='func' requires a pandas DataFrame 'distmat'.")
# Align to feature order
distmat = distmat.loc[tab.index, tab.index].copy()
if constrain_by is not None:
if meta is None or constrain_by not in meta.columns:
raise ValueError("constrain_by requires a metadata DataFrame containing the specified column.")
if iterations < 1:
raise ValueError("iterations must be >= 1.")
if randomization not in {"abundance", "frequency"}:
raise ValueError("randomization must be 'abundance' or 'frequency'.")
# --- RNG + tqdm
rng = random_state if isinstance(random_state, np.random.Generator) else np.random.default_rng(random_state)
tqdm = _get_tqdm(use_tqdm)
# --- Partition samples into constraint groups
if constrain_by is None:
groups = [tab.columns.tolist()]
else:
cats = pd.unique(meta[constrain_by])
groups = [meta.index[meta[constrain_by] == cat].tolist() for cat in cats]
# --- Precompute per-group selection probabilities and sample stats
features = tab.index.tolist()
group_specs = []
for smp_list in groups:
subtab = tab[smp_list]
# selection probs
if randomization == "abundance":
abund_series = subtab.sum(axis=1).astype(float) # (N,)
abund_total = float(abund_series.sum())
if abund_total <= 0:
sel_p = np.full(len(features), 1.0 / len(features), dtype=float)
else:
sel_p = (abund_series / abund_total).to_numpy()
# store for within-selected allocation too
group_specs.append(("abundance", smp_list, sel_p, abund_series))
else: # "frequency"
sub_bin = (subtab > 0).astype(np.int8)
freq_counts = sub_bin.sum(axis=1).to_numpy(dtype=np.int64) # (N,)
if int(freq_counts.sum()) == 0:
sel_p = np.full(len(features), 1.0 / len(features), dtype=float)
else:
sel_p = freq_counts / int(freq_counts.sum())
# for allocation we still use abundances (fallback uniform if zero)
abund_series = subtab.sum(axis=1).astype(float)
group_specs.append(("frequency", smp_list, sel_p, abund_series))
# per-sample richness and read totals (preserved)
richness_vec = (tab > 0).sum(axis=0).to_numpy(dtype=np.int64) # (S,)
reads_vec = tab.sum(axis=0).to_numpy(dtype=np.int64) # (S,)
smp_index = {smp: i for i, smp in enumerate(tab.columns)}
# --- Helper: compute beta-diversity for a table (dispatch)
def _beta_for_table(t: pd.DataFrame) -> pd.DataFrame:
if div_type.lower() == "bray":
return bray(t)
if div_type.lower() == "jaccard":
return jaccard(t)
if div_type == "naive":
return naive_beta(t, q=q)
if div_type == "phyl":
return phyl_beta({"tab": t, "tree": tree}, q=q)
if div_type == "func":
return func_beta(t, distmat, q=q)
raise ValueError("Unsupported div_type. Choose among {'Jaccard','Bray','naive','phyl','func'}.")
# --- Observed beta-diversity
obs_beta = _beta_for_table(tab)
obs_arr = obs_beta.to_numpy()
# --- Streaming accumulators (S × S)
mu = np.zeros_like(obs_arr, dtype=np.float64) # null mean
M2 = np.zeros_like(obs_arr, dtype=np.float64) # for variance
count_lt = np.zeros_like(obs_arr, dtype=np.int64) # p-index counts (null < obs)
count_eq = np.zeros_like(obs_arr, dtype=np.int64) # p-index ties
# --- Iteration loop
for t in tqdm(
range(1, iterations + 1),
desc="iterations",
unit="iter",
leave=False,
ncols=80,
ascii=True,
mininterval=0.5,
position=0,
miniters=1,
):
# fresh randomized table (zeros)
rtab = pd.DataFrame(0, index=tab.index, columns=tab.columns, dtype=np.int64)
# randomize within each group
for mode, smp_list, sel_p, abund_series in group_specs:
# shared pre-objects
features_arr = np.array(features, dtype=object)
smp_cols = smp_list
for smp in smp_cols:
sidx = smp_index[smp]
richness = int(richness_vec[sidx])
reads = int(reads_vec[sidx])
if richness <= 0 or reads <= 0:
continue
# 1) draw a set of taxa of size 'richness' without replacement
rows = rng.choice(features_arr, size=richness, replace=False, p=sel_p)
# mark presence (1) for selected taxa
rtab.loc[rows, smp] = 1
# 2) allocate extra reads to match total counts
extra = reads - richness
if extra > 0:
# probabilities within the selected set proportional to group abundance
sub_abund = abund_series.loc[rows].to_numpy(dtype=float)
sub_total = float(sub_abund.sum())
if sub_total > 0:
sub_p = sub_abund / sub_total
else:
sub_p = np.full(len(rows), 1.0 / len(rows), dtype=float)
# sample with replacement and accumulate counts
draws = rng.choice(rows, size=extra, replace=True, p=sub_p)
uniq, cnts = np.unique(draws, return_counts=True)
rtab.loc[uniq, smp] += cnts
# compute null beta for this iteration
null_beta = _beta_for_table(rtab)
x = null_beta.to_numpy()
# --- Welford updates
delta = x - mu
mu += delta / t
M2 += delta * (x - mu)
# --- p-index bookkeeping (Raup–Crick)
count_lt += (x < obs_arr)
count_eq += (x == obs_arr)
# --- Finalize statistics
denom_var = max(1, iterations - 1)
null_mean = mu
null_std = np.sqrt(np.maximum(M2 / denom_var, 0.0))
p = (count_lt + 0.5 * count_eq) / iterations
with np.errstate(invalid="ignore", divide="ignore"):
ses_arr = np.where(null_std > 0, (null_mean - obs_arr) / null_std, np.nan)
# --- Pack DataFrames
index_cols = tab.columns.tolist()
out = {
"div_type": f"{div_type}_q={q}" if div_type in {"naive", "phyl", "func"} else div_type,
"obs_d": pd.DataFrame(obs_arr, index=index_cols, columns=index_cols),
"p": pd.DataFrame(p, index=index_cols, columns=index_cols),
"null_mean":pd.DataFrame(null_mean, index=index_cols, columns=index_cols),
"null_std": pd.DataFrame(null_std, index=index_cols, columns=index_cols),
"ses": pd.DataFrame(ses_arr, index=index_cols, columns=index_cols),
}
return out
# ---------------------------------------------------------------------------
# MPDq and NRIq
# ---------------------------------------------------------------------------
[docs]
def nriq(
obj: Union[Dict[str, Any], Any],
distmat: pd.DataFrame,
*,
q: float = 1.0,
iterations: int = 999,
randomization: Literal["features", "abundances"] = "features",
use_tqdm: bool = True,
random_state: Optional[Union[int, np.random.Generator]] = None,
**kwargs,
) -> pd.DataFrame:
"""
Net Relatedness Index (NRI) with q-weighting of relative abundances.
Accepts either a MicrobiomeData object or a dict with at least a 'tab' DataFrame.
Parameters
----------
obj : MicrobiomeData, dict, or compatible object
Input data. Must provide at least an abundance table ('tab').
distmat : pd.DataFrame
Square distance matrix indexed/columned by feature ids.
q : float, default=1.0
Order of diversity weighting applied to relative abundances.
iterations : int, default=999
Number of random permutations of distmat.
randomization : {'features', 'abundances'}, default='features'
Randomization strategy. Shuffle features in the phylogenetic tree
or relative abundance values in each sample.
use_tqdm : bool, default=True
Use tqdm for progress bars.
random_state : int or np.random.Generator, optional
Random seed or generator for reproducibility.
Returns
-------
pandas.DataFrame
Indexed by sample names with columns:
- 'MPDq'
- 'null_mean'
- 'null_std'
- 'p' (Pr[ null < observed ] + 0.5*ties) / iterations
- 'ses' (null_mean - observed) / null_std
Notes
-----
- A p value close to zero means that the observed MPD is lower than the null expectation
- A p value close to one means that the observed MPD is higher than the null expectation
- A positive ses means that the observed MPD is lower than the null expectation
- A negative ses means that the observed MPD is higher than the null expectation
References
----------
Webb et al. (2002) *American Naturalist*.
"""
if "seed" in kwargs:
if random_state is not None:
raise TypeError("Specify only one of 'random_state' or 'seed'.")
random_state = kwargs.pop("seed")
if kwargs:
raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}")
# Robustly extract the abundance table using get_df
tab = get_df(obj, "tab")
if tab is None or tab.empty:
raise ValueError("obj must contain a pandas DataFrame under key 'tab'.")
if not set(tab.index).issubset(set(distmat.index)) or not set(tab.index).issubset(set(distmat.columns)):
missing = sorted(list(set(tab.index) - set(distmat.index)))
raise ValueError(
f"distmat must include all feature ids from tab.index. Missing count: {len(missing)} (e.g., {missing[:5]})"
)
smplist = tab.columns
D = distmat.loc[tab.index, tab.index].to_numpy()
R = (tab / tab.sum(axis=0)).to_numpy()
if q == 1.0:
Rq = R
else:
mask = R > 0
Rq = R.copy()
Rq[mask] = np.power(Rq[mask], q)
def _alpha_mpdq(_D, _Rq):
# sum_i w_i
z = _Rq.sum(axis=0)
# full numerator w^T D w (includes diagonal)
num_full = (_Rq * (_D @ _Rq)).sum(axis=0)
w2 = (_Rq * _Rq).sum(axis=0)
den = z*z - w2 # denominator excluding diagonal
present_counts = (_Rq > 0).sum(axis=0) # (S,)
too_small = present_counts < 2 # boolean mask
out = np.full_like(den, np.nan, dtype=float)
valid = (~too_small) & (den > 0)
out[valid] = num_full[valid] / den[valid]
return out
present_counts = (R > 0).sum(axis=0) # shape (S,)
obs = _alpha_mpdq(D, Rq)
obs[present_counts < 2] = np.nan
# streaming stats init
n = Rq.shape[1]
mu = np.zeros(n, dtype=np.float64)
M2 = np.zeros(n, dtype=np.float64)
count_lt = np.zeros(n, dtype=np.int64)
count_eq = np.zeros(n, dtype=np.int64)
rng = random_state if isinstance(random_state, np.random.Generator) else np.random.default_rng(random_state)
tqdm = _get_tqdm(use_tqdm)
if iterations < 1:
return(pd.DataFrame({"MPDq": obs}, index=smplist))
if randomization not in {"features", "abundances"}:
raise ValueError("randomization must be 'features' or 'abundances'.")
# null loop
for t in tqdm(
range(1, iterations + 1),
desc="iterations",
unit="iter",
leave=False,
ncols=80,
ascii=True,
mininterval=0.5,
position=0,
miniters=1,
):
if randomization == "features":
# permute features once and apply to all samples (permute rows of Rq)
perm = rng.permutation(Rq.shape[0])
Rq_perm = Rq[perm, :]
x = _alpha_mpdq(D, Rq_perm)
elif randomization == "abundances":
# shuffle abundances within each sample (permute rows per column)
# permute a view of Rq columnwise
Rq_perm = np.empty_like(Rq)
for j in range(n):
Rq_perm[:, j] = Rq[rng.permutation(Rq.shape[0]), j]
x = _alpha_mpdq(D, Rq_perm)
# Welford updates
delta = x - mu
mu += delta / t
M2 += delta * (x - mu)
# p-index counts vs observed
count_lt += (x < obs)
count_eq += (x == obs)
null_mean = mu
null_std = np.sqrt(np.maximum(M2 / max(1, (iterations - 1)), 0.0))
p = (count_lt + 0.5 * count_eq) / iterations
ses = np.where(null_std > 0, (null_mean - obs) / null_std, np.nan)
output = pd.DataFrame(
{
"MPDq": obs,
"null_mean": null_mean,
"null_std": null_std,
"p": p,
"ses": ses,
},
index=smplist
)
return output
# ---------------------------------------------------------------------------
# NTIq
# ---------------------------------------------------------------------------
[docs]
def ntiq(
obj: Union[Dict[str, Any], Any],
distmat: pd.DataFrame,
*,
q: float = 1.0,
iterations: int = 999,
randomization: Literal["features", "abundances"] = "features",
use_tqdm: bool = True,
random_state: Optional[Union[int, np.random.Generator]] = None,
**kwargs,
) -> pd.DataFrame:
"""
Nearest Taxon Index (NTI) with q-weighting of relative abundances.
Computes MNTD_q (mean nearest-taxon distance with q-weighted abundances),
then compares to a null obtained by either permuting feature labels
("features") or shuffling abundances within each sample ("abundances").
Parameters
----------
obj : MicrobiomeData, dict, or compatible object
Input data. Must provide at least an abundance table ('tab').
distmat : pd.DataFrame
Square distance matrix indexed/columned by feature ids.
q : float, default=1.0
Order of diversity weighting applied to relative abundances.
iterations : int, default=999
Number of random permutations of distmat.
randomization : {'features', 'abundances'}, default='features'
Randomization strategy. Shuffle features in the phylogenetic tree
or relative abundance values in each sample.
use_tqdm : bool, default=True
Use tqdm for progress bars.
random_state : int or np.random.Generator, optional
Random seed or generator for reproducibility.
Returns
-------
pandas.DataFrame
Indexed by sample names with columns:
- 'MNTDq'
- 'null_mean'
- 'null_std'
- 'p' (Pr[ null < observed ] + 0.5*ties) / iterations
- 'ses' (null_mean - observed) / null_std
Notes
-----
- A p value close to zero means that the observed MPNTD is lower than the null expectation
- A p value close to one means that the observed MNTD is higher than the null expectation
- A positive ses means that the observed MNTD is lower than the null expectation
- A negative ses means that the observed MNTD is higher than the null expectation
"""
if "seed" in kwargs:
if random_state is not None:
raise TypeError("Specify only one of 'random_state' or 'seed'.")
random_state = kwargs.pop("seed")
if kwargs:
raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}")
# --- Input & alignment ---
tab = get_df(obj, "tab")
if tab is None or tab.empty:
raise ValueError("obj must contain a pandas DataFrame under key 'tab'.")
if not set(tab.index).issubset(set(distmat.index)) or not set(tab.index).issubset(set(distmat.columns)):
missing = sorted(list(set(tab.index) - set(distmat.index)))
raise ValueError(
f"distmat must include all feature ids from tab.index. Missing count: {len(missing)} (e.g., {missing[:5]})"
)
smplist = tab.columns
D = distmat.loc[tab.index, tab.index].to_numpy() # (N x N)
# Relative abundances
R = (tab / tab.sum(axis=0)).to_numpy(dtype=float) # (N x S)
# q-weighting (only for positive entries)
if q == 1.0:
Rq = R
else:
Rq = R.copy()
mask_pos = Rq > 0
Rq[mask_pos] = np.power(Rq[mask_pos], q)
N, S = Rq.shape
# --- Helper: compute vector of MNTD_q for all samples in one go ---
# For each sample s:
# 1) take presence mask m = (R[:, s] > 0)
# 2) within D[m, m], set diagonal to +inf and take rowwise min -> dmin (per-present feature)
# 3) aggregate: sum( w_i^q * dmin_i ) / sum( w_i^q ) where w_i = R[:, s]
def _mntdq_all(D: np.ndarray, R_used: np.ndarray, Rq_used: np.ndarray) -> np.ndarray:
S = R_used.shape[1]
out = np.full(S, np.nan, dtype=float)
for s in range(S):
m = R_used[:, s] > 0.0
k = int(m.sum())
if k < 2:
out[s] = np.nan
continue
# then:
D_sub = D[np.ix_(m, m)].copy()
np.fill_diagonal(D_sub, np.inf)
dmin = D_sub.min(axis=1)
# q-weighted aggregation
wq = Rq_used[m, s]
denom = float(wq.sum())
if denom > 0.0:
out[s] = float((wq * dmin).sum() / denom)
else:
out[s] = np.nan
return out
# --- Observed MNTD_q ---
obs = _mntdq_all(D, R, Rq)
# --- Streaming (Welford) statistics over null iterations ---
mu = np.zeros(S, dtype=np.float64) # null_mean (per sample)
M2 = np.zeros(S, dtype=np.float64) # for variance
count_lt = np.zeros(S, dtype=np.int64) # for p-index
count_eq = np.zeros(S, dtype=np.int64)
rng = random_state if isinstance(random_state, np.random.Generator) else np.random.default_rng(random_state)
tqdm = _get_tqdm(use_tqdm)
if iterations < 1:
return(pd.DataFrame({"MNTDq": obs}, index=smplist))
if randomization not in {"features", "abundances"}:
raise ValueError("randomization must be 'features' or 'abundances'.")
# Null loop
for t in tqdm(
range(1, iterations + 1),
desc="iterations",
unit="iter",
leave=False,
ncols=80,
ascii=True,
mininterval=0.5,
position=0,
miniters=1,
):
if randomization == "features":
# Permute feature identities once per iteration (relabels rows of Rq)
perm = rng.permutation(N)
R_perm = R[perm, :]
if q == 1.0:
Rq_perm = R_perm
else:
Rq_perm = R_perm.copy()
posp = Rq_perm > 0.0
Rq_perm[posp] = np.power(Rq_perm[posp], q)
x = _mntdq_all(D, R_perm, Rq_perm)
else: # "abundances"
R_perm = np.empty_like(R)
for j in range(R.shape[1]):
R_perm[:, j] = R[rng.permutation(R.shape[0]), j]
if q == 1.0:
Rq_perm = R_perm
else:
Rq_perm = R_perm.copy()
posp = Rq_perm > 0.0
Rq_perm[posp] = np.power(Rq_perm[posp], q)
x = _mntdq_all(D, R_perm, Rq_perm)
# --- Welford updates (vectorized) ---
delta = x - mu
mu += delta / t
M2 += delta * (x - mu)
# --- p-index bookkeeping against observed ---
# count how often null < obs, with 0.5 for ties
count_lt += (x < obs)
count_eq += (x == obs)
# Finalize stats
null_mean = mu
# population std estimate across iterations: sqrt(M2 / max(1, iterations-1))
null_std = np.sqrt(np.maximum(M2 / max(1, (iterations - 1)), 0.0))
p = (count_lt + 0.5 * count_eq) / iterations
ses = np.where(null_std > 0, (null_mean - obs) / null_std, np.nan)
# Pack results
output = pd.DataFrame(
{
"MNTDq": obs,
"null_mean": null_mean,
"null_std": null_std,
"p": p,
"ses": ses,
},
index=smplist,
)
return output
# ---------------------------------------------------------------------------
# beta-NRIq
# ---------------------------------------------------------------------------
[docs]
def beta_nriq(
obj: Union[Dict[str, Any], Any],
distmat: pd.DataFrame,
*,
q: float = 1.0,
iterations: int = 999,
randomization: Literal["features", "abundances"] = "features",
use_tqdm: bool = True,
random_state: Optional[Union[int, np.random.Generator]] = None,
**kwargs,
) -> Dict[str, pd.DataFrame]:
"""
Computes beta-MPD_q for all sample pairs, then contrasts against a null
generated by (a) feature label permutations ("features") or
(b) within-sample abundance shuffles ("abundances").
Parameters
----------
obj : MicrobiomeData, dict, or compatible object
Input data. Must provide at least an abundance table ('tab').
distmat : pd.DataFrame
Square distance matrix indexed/columned by feature ids.
q : float, default=1.0
Order of diversity weighting applied to relative abundances.
iterations : int, default=999
Number of random permutations of distmat.
randomization : {'features', 'abundances'}, default='features'
Randomization strategy. Shuffle features in the phylogenetic tree
or relative abundance values in each sample.
use_tqdm : bool, default=True
Use tqdm for progress bars.
random_state : int or np.random.Generator, optional
Random seed or generator for reproducibility.
Returns
-------
dict of pandas.DataFrame (S x S):
'beta_MPDq' : observed beta-MPD_q
'null_mean' : mean of null beta-MPD_q
'null_std' : std of null beta-MPD_q
'p' : (count(null < obs) + 0.5 * ties) / iterations
'ses' : (null_mean - obs) / null_std
Notes
-----
- Returns a dataframe with observed beta_MPDq if iterations=0, otherwise a dictionary is returned
- A p value close to zero means that the observed MPD between samples is lower than the null expectation
- A p value close to one means that the observed MPD between samples is higher than the null expectation
- A positive ses means that the observed MPD between samples is lower than the null expectation
- A negative ses means that the observed MPD between samples is higher than the null expectation
"""
if "seed" in kwargs:
if random_state is not None:
raise TypeError("Specify only one of 'random_state' or 'seed'.")
random_state = kwargs.pop("seed")
if kwargs:
raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}")
# --- Input & alignment ---
tab = get_df(obj, "tab")
if tab is None or tab.empty:
raise ValueError("obj must contain a non-empty pandas DataFrame under key 'tab'.")
if not set(tab.index).issubset(set(distmat.index)) or not set(tab.index).issubset(set(distmat.columns)):
missing = sorted(list(set(tab.index) - set(distmat.index)))
raise ValueError(
f"distmat must include all feature ids from tab.index. "
f"Missing count: {len(missing)} (e.g., {missing[:5]})"
)
smplist = tab.columns
D = distmat.loc[tab.index, tab.index].to_numpy() # (N x N), float
# Relative abundances (N x S)
R = (tab / tab.sum(axis=0)).to_numpy(dtype=float)
# q-weighting: only positives are powered (consistent with your nriq)
if q == 1.0:
Rq = R
else:
Rq = R.copy()
mask_pos = Rq > 0.0
Rq[mask_pos] = np.power(Rq[mask_pos], q)
N, S = Rq.shape
# --- Observed beta-MPD_q for all pairs (vectorized) ---
# obs[s,t] = sum_{i,j} Rq[i,s] * D[i,j] * Rq[j,t] / (sum_i Rq[i,s] * sum_j Rq[j,t])
M_obs = D @ Rq # (N x S)
num_obs = Rq.T @ M_obs # (S x S)
z = Rq.sum(axis=0) # (S,)
den_obs = z[:, None] * z[None, :] # (S x S)
with np.errstate(invalid="ignore", divide="ignore"):
obs = num_obs / den_obs
# --- Streaming (Welford) over null iterations ---
mu = np.zeros((S, S), dtype=np.float64)
M2 = np.zeros((S, S), dtype=np.float64)
count_lt = np.zeros((S, S), dtype=np.int64)
count_eq = np.zeros((S, S), dtype=np.int64)
rng = random_state if isinstance(random_state, np.random.Generator) else np.random.default_rng(random_state)
tqdm = _get_tqdm(use_tqdm)
if iterations < 1:
df_obs = pd.DataFrame(obs, index=smplist, columns=smplist)
np.fill_diagonal(df_obs.to_numpy(), np.nan)
return df_obs
if randomization not in {"features", "abundances"}:
raise ValueError("randomization must be 'features' or 'abundances'.")
for t in tqdm(
range(1, iterations + 1),
desc="iterations",
unit="iter",
leave=False,
ncols=80,
ascii=True,
mininterval=0.5,
position=0,
miniters=1,
):
if randomization == "features":
# Permute feature identities once; apply to all samples
perm = rng.permutation(N)
Rq_perm = Rq[perm, :]
else: # "abundances"
# Shuffle abundances independently within each sample (permute rows per column)
Rq_perm = np.empty_like(Rq)
for j in range(S):
Rq_perm[:, j] = Rq[rng.permutation(N), j]
# Null beta-MPD_q (vectorized)
M_null = D @ Rq_perm
num_null = Rq_perm.T @ M_null
z_null = Rq_perm.sum(axis=0)
den_null = z_null[:, None] * z_null[None, :]
with np.errstate(invalid="ignore", divide="ignore"):
x = num_null / den_null # (S x S)
# Welford
delta = x - mu
mu += delta / t
M2 += delta * (x - mu)
# p-index vs observed
count_lt += (x < obs)
count_eq += (x == obs)
# Finalize stats
denom_var = max(1, iterations - 1)
null_mean = mu
null_std = np.sqrt(np.maximum(M2 / denom_var, 0.0))
p = (count_lt + 0.5 * count_eq) / iterations
with np.errstate(invalid="ignore", divide="ignore"):
ses = np.where(null_std > 0, (null_mean - obs) / null_std, np.nan)
# Build DataFrames
idxcols = list(smplist)
df_obs = pd.DataFrame(obs, index=idxcols, columns=idxcols)
df_mean = pd.DataFrame(null_mean, index=idxcols, columns=idxcols)
df_std = pd.DataFrame(null_std, index=idxcols, columns=idxcols)
df_p = pd.DataFrame(p, index=idxcols, columns=idxcols)
df_ses = pd.DataFrame(ses, index=idxcols, columns=idxcols)
for df in (df_obs, df_mean, df_std, df_p, df_ses):
np.fill_diagonal(df.values, np.nan)
return {
"beta_MPDq": df_obs,
"null_mean": df_mean,
"null_std": df_std,
"p": df_p,
"ses": df_ses,
}
# ---------------------------------------------------------------------------
# beta-NTIq
# ---------------------------------------------------------------------------
[docs]
def beta_ntiq(
obj: Union[Dict[str, Any], Any],
distmat: pd.DataFrame,
*,
q: float = 1.0,
iterations: int = 999,
include_conspecifics: bool = False,
randomization: Literal["features", "abundances"] = "features",
use_tqdm: bool = True,
random_state: Optional[Union[int, np.random.Generator]] = None,
**kwargs,
) -> Dict[str, pd.DataFrame]:
"""
Computes beta-MNTD_q (mean nearest-taxon distance with q-weighted abundances)
for all sample pairs, then contrasts the observed matrix against a null
distribution generated by randomization:
- randomization="features": permute feature identities (rows) identically across samples
- randomization="abundances": shuffle abundances within each sample (column-wise)
The null distribution is aggregated online using Welford updates, yielding
per-pair null mean, null std, tie-aware p-index, and standardized effect size.
Parameters
----------
obj : MicrobiomeData | dict | Any
Input with at least an abundance table under key 'tab'.
distmat : pandas.DataFrame
Square distance matrix (features × features) whose index/columns include `tab.index`.
q : float, default=1.0
Diversity order used to weight relative abundances (applied only to strictly positive entries).
iterations : int, default=999
Number of randomization iterations used to build the null distribution.
include_conspecifics : bool, default=False
Determines whether conspecifics (identical features shared between samples) are allowed
to contribute zero-distance matches in the nearest-taxon calculation.
randomization : {"features", "abundances"}, default="features"
Randomization strategy for the null model:
- "features": permute feature identities identically for all samples (tip-label permutation).
- "abundances": shuffle abundances within each sample (column-wise permutation).
use_tqdm : bool, default=True
Use `tqdm` for progress bars (a lightweight stub is used if `tqdm` is unavailable).
random_state : int | numpy.random.Generator, optional
Random seed or Generator for reproducibility.
Returns
-------
dict of pandas.DataFrame
Full (samples × samples) matrices:
- 'beta_MNTDq' : observed beta-MNTD_q
- 'null_mean' : mean of null beta-MNTD_q
- 'null_std' : std of null beta-MNTD_q
- 'p' : (count(null < observed) + 0.5 * ties) / iterations
- 'ses' : (null_mean - observed) / null_std
Diagonal entries are set to NaN.
Notes
-----
- Returns a dataframe with observed beta_MNTDq if iterations=0, otherwise a dictionary is returned
- A p value close to zero means that the observed MNTD between samples is lower than the null expectation
- A p value close to one means that the observed MNTD between samples is higher than the null expectation
- A positive ses means that the observed MNTD between samples is lower than the null expectation
- A negative ses means that the observed MNTD between samples is higher than the null expectation
References
----------
Webb et al. (2002) American Naturalist.
Stegen et al. (2013) ISME Journal.
"""
if "seed" in kwargs:
if random_state is not None:
raise TypeError("Specify only one of 'random_state' or 'seed'.")
random_state = kwargs.pop("seed")
if kwargs:
raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}")
# ---- Input & alignment ----
tab = get_df(obj, "tab")
if tab is None or tab.empty:
raise ValueError("obj must contain a non-empty pandas DataFrame under key 'tab'.")
if not set(tab.index).issubset(set(distmat.index)) or not set(tab.index).issubset(set(distmat.columns)):
missing = sorted(list(set(tab.index) - set(distmat.index)))
raise ValueError(
f"distmat must include all feature ids from tab.index. Missing count: {len(missing)} (e.g., {missing[:5]})"
)
smplist = tab.columns
D = distmat.loc[tab.index, tab.index].to_numpy(copy=True) # (N x N), float
# Relative abundances (N x S), allowing potential NaNs if a column sums to zero
R = (tab / tab.sum(axis=0)).to_numpy(dtype=float) # (N x S)
N, S = R.shape
# q-weighting on positives only (consistent with nriq)
if q == 1.0:
Rq = R.copy()
else:
Rq = R.copy()
pos = Rq > 0.0
Rq[pos] = np.power(Rq[pos], q)
# Column totals of q-weighted abundances per sample
z = Rq.sum(axis=0) # (S,)
# Where z == 0, we will later produce NaN divisions
# ---- Helper: compute full directed MNTD_q matrices in a vectorized way ----
# Given presence mask B (N x S, boolean) and q-weights Rq (N x S),
# we need two N×S "nearest-to-set" mats:
# Delta_col[:, t] = min over j in sample t (D[:, j])
# Delta_row[:, s] = min over i in sample s (D[i, :])
# Then
# A = (Rq.T @ Delta_col) / z[:, None] # s → t
# B = ((Rq.T @ Delta_row).T) / z[None, :] # t → s
# beta_MNTD_q = 0.5 * (A + B)
def _delta_col_row(D: np.ndarray, B: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Compute Delta_col (N × S) and Delta_row (N × S) with optional
exclusion of conspecifics (i == j)."""
Delta_col = np.full((N, S), np.nan, dtype=float)
Delta_row = np.full((N, S), np.nan, dtype=float)
for t in range(S):
mask_t = B[:, t]
if not np.any(mask_t):
continue
sub = D[:, mask_t] # (N × k)
if not include_conspecifics:
# If feature i is present in sample t, set its self-distance to +inf
diag_mask = np.zeros_like(sub, dtype=bool)
diag_indices = np.where(mask_t)[0]
diag_mask[diag_indices, np.arange(len(diag_indices))] = True
sub = sub.copy()
sub[diag_mask] = np.inf
Delta_col[:, t] = sub.min(axis=1)
for s in range(S):
mask_s = B[:, s]
if not np.any(mask_s):
continue
sub = D[mask_s, :] # (k × N)
if not include_conspecifics:
diag_indices = np.where(mask_s)[0]
sub = sub.copy()
sub[np.arange(len(diag_indices)), diag_indices] = np.inf
Delta_row[:, s] = sub.min(axis=0)
return Delta_col, Delta_row
def _beta_mntdq_full(D: np.ndarray, R: np.ndarray, Rq: np.ndarray) -> np.ndarray:
"""Observed (or null) full-matrix beta-MNTD_q from D, R (presence), and Rq (q-weights)."""
B = R > 0.0 # presence/absence for each sample
Delta_col, Delta_row = _delta_col_row(D, B)
# Clean directed matrices
Delta_col = np.nan_to_num(Delta_col, nan=0.0, posinf=0.0, neginf=0.0)
Delta_row = np.nan_to_num(Delta_row, nan=0.0, posinf=0.0, neginf=0.0)
# Numerators for directed terms
A_num = Rq.T @ Delta_col # (S x S)
B_num = (Rq.T @ Delta_row).T # (S x S) via transpose for (t → s)
# Denominators (broadcast)
with np.errstate(invalid="ignore", divide="ignore"):
A = A_num / z[:, None]
Bdir = B_num / z[None, :]
beta = 0.5 * (A + Bdir)
return beta
# ---- Observed beta-MNTD_q ----
obs = _beta_mntdq_full(D, R, Rq) # (S x S)
# ---- Streaming (Welford) over null iterations ----
mu = np.zeros((S, S), dtype=np.float64)
M2 = np.zeros((S, S), dtype=np.float64)
count_lt = np.zeros((S, S), dtype=np.int64)
count_eq = np.zeros((S, S), dtype=np.int64)
rng = random_state if isinstance(random_state, np.random.Generator) else np.random.default_rng(random_state)
tqdm = _get_tqdm(use_tqdm)
if iterations < 1:
df_obs = pd.DataFrame(obs, index=smplist, columns=smplist)
np.fill_diagonal(df_obs.to_numpy(), np.nan)
return df_obs
if randomization not in {"features", "abundances"}:
raise ValueError("randomization must be 'features' or 'abundances'.")
for t in tqdm(
range(1, iterations + 1),
desc="iterations",
unit="iter",
leave=False,
ncols=80,
ascii=True,
mininterval=0.5,
position=0,
miniters=1,
):
if randomization == "features":
# Permute feature identities identically across samples
perm = rng.permutation(N)
R_perm = R[perm, :]
else: # "abundances"
# Shuffle abundances within each sample (column-wise)
R_perm = np.empty_like(R)
for j in range(S):
R_perm[:, j] = R[rng.permutation(N), j]
# Recompute q-weights from the permuted R (keeps presence mask consistent with R)
if q == 1.0:
Rq_perm = R_perm
else:
Rq_perm = R_perm.copy()
posp = Rq_perm > 0.0
Rq_perm[posp] = np.power(Rq_perm[posp], q)
x = _beta_mntdq_full(D, R_perm, Rq_perm) # (S x S) null draw
# Welford updates
delta = x - mu
mu += delta / t
M2 += delta * (x - mu)
# p-index bookkeeping
count_lt += (x < obs)
count_eq += (x == obs)
# Finalize stats
denom_var = max(1, iterations - 1)
null_mean = mu
null_std = np.sqrt(np.maximum(M2 / denom_var, 0.0))
p = (count_lt + 0.5 * count_eq) / iterations
with np.errstate(invalid="ignore", divide="ignore"):
ses = np.where(null_std > 0, (null_mean - obs) / null_std, np.nan)
# Build DataFrames
idxcols = list(smplist)
df_obs = pd.DataFrame(obs, index=idxcols, columns=idxcols)
df_mean = pd.DataFrame(null_mean, index=idxcols, columns=idxcols)
df_std = pd.DataFrame(null_std, index=idxcols, columns=idxcols)
df_p = pd.DataFrame(p, index=idxcols, columns=idxcols)
df_ses = pd.DataFrame(ses, index=idxcols, columns=idxcols)
# Set diagonals to NaN for all outputs (consistent with prior beta-* functions)
for df in (df_obs, df_mean, df_std, df_p, df_ses):
np.fill_diagonal(df.to_numpy(), np.nan)
return {
"beta_MNTDq": df_obs,
"null_mean": df_mean,
"null_std": df_std,
"p": df_p,
"ses": df_ses,
}