Source code for episia.stats.contingency

"""
Contingency Core 2x2 contingency table calculations for epidemiology.

This module provides the Table2x2 class for performing epidemiological
calculations on 2x2 contingency tables, including risk ratios, odds ratios,
risk differences, and various confidence intervals.
"""

import numpy as np
from typing import Tuple, Dict, Optional, Union
from dataclasses import dataclass
from enum import Enum

[docs] class ConfidenceMethod(Enum): """Methods for calculating confidence intervals.""" WALD = "wald" DELTA = "delta" SCORE = "score" EXACT = "exact"
[docs] @dataclass class RiskRatioResult: """Result object for Risk Ratio calculations.""" estimate: float ci_lower: float ci_upper: float method: str table: "Table2x2" null_value: float = 1.0
[docs] def __repr__(self) -> str: sig = " *" if self.significant else "" return f"Risk Ratio: {self.estimate:.3f} ({self.ci_lower:.3f}-{self.ci_upper:.3f}){sig}"
@property def significant(self) -> bool: """True if the 95% CI does not contain the null value (1.0).""" return not (self.ci_lower <= self.null_value <= self.ci_upper) @property def p_value(self) -> Optional[float]: """Approximate two-sided p-value from the CI (Wald method).""" import math if self.estimate <= 0: return None try: from scipy import stats as _stats se = (math.log(self.ci_upper) - math.log(self.ci_lower)) / (2 * 1.96) if se == 0: return None z = abs(math.log(self.estimate)) / se return float(2 * (1 - _stats.norm.cdf(z))) except Exception: return None
[docs] def to_dict(self) -> Dict: """Return a JSON-serializable dictionary.""" return { "measure": "risk_ratio", "estimate": self.estimate, "ci_lower": self.ci_lower, "ci_upper": self.ci_upper, "method": self.method, "significant": self.significant, "p_value": self.p_value, "null_value": self.null_value, "table": self.table.to_dict(), }
[docs] def plot(self, backend: str = "plotly", **kwargs): """Forest-style plot for this risk ratio.""" from api.results import make_ci, make_association result = make_association( measure="risk_ratio", estimate=self.estimate, ci_lower=self.ci_lower, ci_upper=self.ci_upper, p_value=self.p_value, null_value=self.null_value, ) return result.plot(backend=backend, **kwargs)
[docs] @dataclass class OddsRatioResult: """Result object for Odds Ratio calculations.""" estimate: float ci_lower: float ci_upper: float method: str table: "Table2x2" null_value: float = 1.0
[docs] def __repr__(self) -> str: sig = " *" if self.significant else "" return f"Odds Ratio: {self.estimate:.3f} ({self.ci_lower:.3f}-{self.ci_upper:.3f}){sig}"
@property def significant(self) -> bool: """True if the 95% CI does not contain the null value (1.0).""" return not (self.ci_lower <= self.null_value <= self.ci_upper) @property def p_value(self) -> Optional[float]: """Approximate two-sided p-value from the CI.""" import math if self.estimate <= 0: return None try: from scipy import stats as _stats se = (math.log(self.ci_upper) - math.log(self.ci_lower)) / (2 * 1.96) if se == 0: return None z = abs(math.log(self.estimate)) / se return float(2 * (1 - _stats.norm.cdf(z))) except Exception: return None
[docs] def to_dict(self) -> Dict: """Return a JSON-serializable dictionary.""" return { "measure": "odds_ratio", "estimate": self.estimate, "ci_lower": self.ci_lower, "ci_upper": self.ci_upper, "method": self.method, "significant": self.significant, "p_value": self.p_value, "null_value": self.null_value, "table": self.table.to_dict(), }
[docs] def plot(self, backend: str = "plotly", **kwargs): """Forest-style plot for this odds ratio.""" from api.results import make_association result = make_association( measure="odds_ratio", estimate=self.estimate, ci_lower=self.ci_lower, ci_upper=self.ci_upper, p_value=self.p_value, null_value=self.null_value, ) return result.plot(backend=backend, **kwargs)
[docs] class Table2x2: """ 2x2 contingency table for epidemiological calculations. Represents a standard 2x2 table layout: +----------------+-----------+-----------+ | | Exposed | Unexposed | +================+===========+===========+ | Cases | a | b | +----------------+-----------+-----------+ | Non-cases | c | d | +----------------+-----------+-----------+ Attributes: a (int): Exposed cases b (int): Unexposed cases c (int): Exposed non-cases d (int): Unexposed non-cases """ __slots__ = ('a', 'b', 'c', 'd', '_cache')
[docs] def __init__(self, a: int, b: int, c: int, d: int): """ Initialize a 2x2 contingency table. Args: a: Exposed cases (cell a) b: Unexposed cases (cell b) c: Exposed non-cases (cell c) d: Unexposed non-cases (cell d) Raises: ValueError: If any cell value is negative """ # Validate inputs if any(x < 0 for x in (a, b, c, d)): raise ValueError("All cell values must be non-negative") self.a = a self.b = b self.c = c self.d = d # Cache for optimized repeated calculations self._cache: Dict[str, float] = {}
# PROPERTIES & CACHED CALCULATIONS @property def total_exposed(self) -> int: """Total number of exposed individuals (a + c).""" return self.a + self.c @property def total_unexposed(self) -> int: """Total number of unexposed individuals (b + d).""" return self.b + self.d @property def total_cases(self) -> int: """Total number of cases (a + b).""" return self.a + self.b @property def total_non_cases(self) -> int: """Total number of non-cases (c + d).""" return self.c + self.d @property def total(self) -> int: """Total number of individuals (a + b + c + d).""" return self.a + self.b + self.c + self.d @property def risk_exposed(self) -> float: """Risk (incidence proportion) in exposed group: a / (a + c).""" key = "risk_exposed" if key not in self._cache: if self.total_exposed == 0: self._cache[key] = 0.0 else: self._cache[key] = self.a / self.total_exposed return self._cache[key] @property def risk_unexposed(self) -> float: """Risk (incidence proportion) in unexposed group: b / (b + d).""" key = "risk_unexposed" if key not in self._cache: if self.total_unexposed == 0: self._cache[key] = 0.0 else: self._cache[key] = self.b / self.total_unexposed return self._cache[key] @property def odds_exposed(self) -> float: """Odds in exposed group: a / c.""" key = "odds_exposed" if key not in self._cache: if self.c == 0: # Handle division by zero self._cache[key] = float('inf') if self.a > 0 else 0.0 else: self._cache[key] = self.a / self.c return self._cache[key] @property def odds_unexposed(self) -> float: """Odds in unexposed group: b / d.""" key = "odds_unexposed" if key not in self._cache: if self.d == 0: # Handle division by zero self._cache[key] = float('inf') if self.b > 0 else 0.0 else: self._cache[key] = self.b / self.d return self._cache[key] # CORE EPIDEMIOLOGICAL MEASURES
[docs] def risk_ratio(self, method: ConfidenceMethod = ConfidenceMethod.WALD, confidence: float = 0.95) -> RiskRatioResult: """ Calculate risk ratio (relative risk) with confidence interval. Risk Ratio = (a/(a+c)) / (b/(b+d)) Args: method: Method for confidence interval calculation confidence: Confidence level (default: 0.95 for 95% CI) Returns: RiskRatioResult object containing estimate and confidence interval Example: >>> table = Table2x2(10, 20, 30, 40) >>> result = table.risk_ratio() >>> print(result.estimate) 0.6667 """ # Calculate point estimate if self.risk_unexposed == 0: rr = float('inf') if self.risk_exposed > 0 else 0.0 else: rr = self.risk_exposed / self.risk_unexposed # Calculate confidence interval based on method if method == ConfidenceMethod.WALD: ci_lower, ci_upper = self._wald_ci_rr(rr, confidence) elif method == ConfidenceMethod.SCORE: ci_lower, ci_upper = self._score_ci_rr(confidence) else: # Default to delta method ci_lower, ci_upper = self._delta_ci_rr(rr, confidence) return RiskRatioResult( estimate=rr, ci_lower=ci_lower, ci_upper=ci_upper, method=method.value, table=self )
[docs] def odds_ratio(self, method: ConfidenceMethod = ConfidenceMethod.WALD, confidence: float = 0.95) -> OddsRatioResult: """ Calculate odds ratio with confidence interval. Odds Ratio = (a/c) / (b/d) = (a*d) / (b*c) Args: method: Method for confidence interval calculation confidence: Confidence level (default: 0.95 for 95% CI) Returns: OddsRatioResult object containing estimate and confidence interval Example: >>> table = Table2x2(10, 20, 30, 40) >>> result = table.odds_ratio() >>> print(result.estimate) 0.6667 """ # Calculate point estimate if self.b * self.c == 0: # Handle cases with zero cells if self.a * self.d > 0: or_est = float('inf') else: or_est = 0.0 else: or_est = (self.a * self.d) / (self.b * self.c) # Calculate confidence interval if method == ConfidenceMethod.EXACT: ci_lower, ci_upper = self._exact_ci_or(confidence) else: ci_lower, ci_upper = self._wald_ci_or(or_est, confidence) return OddsRatioResult( estimate=or_est, ci_lower=ci_lower, ci_upper=ci_upper, method=method.value, table=self )
[docs] def risk_difference(self, confidence: float = 0.95) -> Dict[str, float]: """ Calculate risk difference (attributable risk) with confidence interval. Risk Difference = Risk_exposed - Risk_unexposed Args: confidence: Confidence level (default: 0.95) Returns: Dictionary with 'estimate', 'ci_lower', and 'ci_upper' """ rd = self.risk_exposed - self.risk_unexposed # Wald-type CI for risk difference se = np.sqrt( (self.risk_exposed * (1 - self.risk_exposed) / self.total_exposed) + (self.risk_unexposed * (1 - self.risk_unexposed) / self.total_unexposed) ) z = self._z_score(confidence) ci_lower = rd - z * se ci_upper = rd + z * se return { "measure": "risk_difference", "estimate": rd, "ci_lower": ci_lower, "ci_upper": ci_upper, "confidence": confidence }
# ATTRIBUTABLE FRACTION MEASURES
[docs] def attributable_fraction_exposed(self) -> float: """ Calculate attributable fraction among the exposed. AF_exposed = (RR - 1) / RR Represents proportion of cases among exposed attributable to exposure. Returns: Attributable fraction (range: -∞ to 1) """ rr = self.risk_ratio().estimate if rr <= 0: return 0.0 return (rr - 1) / rr
# STATISTICAL TESTS
[docs] def chi_square(self, correction: bool = True) -> Dict[str, float]: """ Calculate chi-square test for association. Args: correction: Apply Yates' continuity correction if True Returns: Dictionary with 'chi2', 'p_value', and 'df' (degrees of freedom) """ # Expected frequencies total = self.total e_a = (self.total_cases * self.total_exposed) / total e_b = (self.total_cases * self.total_unexposed) / total e_c = (self.total_non_cases * self.total_exposed) / total e_d = (self.total_non_cases * self.total_unexposed) / total if correction: # Yates' corrected chi-square chi2 = ( (abs(self.a - e_a) - 0.5)**2 / e_a + (abs(self.b - e_b) - 0.5)**2 / e_b + (abs(self.c - e_c) - 0.5)**2 / e_c + (abs(self.d - e_d) - 0.5)**2 / e_d ) else: # Pearson chi-square chi2 = ( (self.a - e_a)**2 / e_a + (self.b - e_b)**2 / e_b + (self.c - e_c)**2 / e_c + (self.d - e_d)**2 / e_d ) # p-value from chi-square distribution with 1 degree of freedom from scipy import stats p_value = 1 - stats.chi2.cdf(chi2, df=1) return { "chi2": chi2, "p_value": p_value, "df": 1, "correction": "yates" if correction else "pearson" }
[docs] def fisher_exact(self) -> Dict[str, float]: """ Perform Fisher's exact test (especially for small samples). Returns: Dictionary with 'odds_ratio', 'p_value' (two-sided) """ from scipy import stats oddsratio, p_value = stats.fisher_exact([ [self.a, self.b], [self.c, self.d] ]) return { "odds_ratio": oddsratio, "p_value": p_value, "test": "fisher_exact" }
# CONFIDENCE INTERVAL METHODS def _wald_ci_rr(self, rr: float, confidence: float) -> Tuple[float, float]: """Wald confidence interval for risk ratio. Uses Haldane-Anscombe continuity correction (+0.5) for zero cells, which avoids ZeroDivisionError when a=0 or b=0 and provides a finite CI estimate consistent with epidemiological practice. """ if rr <= 0: return 0.0, 0.0 if np.isinf(rr): # RR = inf (unexposed risk = 0) — CI is (rr, inf) by convention return float('inf'), float('inf') # Apply Haldane-Anscombe +0.5 correction for any zero cell if self.a == 0 or self.b == 0: a = self.a + 0.5 b = self.b + 0.5 n1 = self.total_exposed + 0.5 n2 = self.total_unexposed + 0.5 # Recompute RR with corrected counts for the CI rr_corr = (a / n1) / (b / n2) else: a, b, n1, n2 = self.a, self.b, self.total_exposed, self.total_unexposed rr_corr = rr # Standard error of log(RR) — Greenland & Robins (1985) var_log_rr = (1/a - 1/n1) + (1/b - 1/n2) se_log_rr = np.sqrt(max(var_log_rr, 0)) z = self._z_score(confidence) log_ci_lower = np.log(rr_corr) - z * se_log_rr log_ci_upper = np.log(rr_corr) + z * se_log_rr return np.exp(log_ci_lower), np.exp(log_ci_upper) def _score_ci_rr(self, confidence: float) -> Tuple[float, float]: """Score confidence interval for risk ratio.""" # This is a simplified version - full implementation would be more complex rr_result = self.risk_ratio() return rr_result.ci_lower, rr_result.ci_upper def _delta_ci_rr(self, rr: float, confidence: float) -> Tuple[float, float]: """Delta method confidence interval for risk ratio.""" # Similar to Wald but with different variance estimator return self._wald_ci_rr(rr, confidence) def _wald_ci_or(self, or_est: float, confidence: float) -> Tuple[float, float]: """Wald confidence interval for odds ratio.""" if or_est <= 0: return 0.0, 0.0 # Standard error on log scale (Woolf's method) if 0 in (self.a, self.b, self.c, self.d): # Add 0.5 to all cells for zero-cell correction (Haldane-Anscombe) a = self.a + 0.5 b = self.b + 0.5 c = self.c + 0.5 d = self.d + 0.5 or_est = (a * d) / (b * c) else: a, b, c, d = self.a, self.b, self.c, self.d var_log_or = 1/a + 1/b + 1/c + 1/d se_log_or = np.sqrt(var_log_or) z = self._z_score(confidence) log_ci_lower = np.log(or_est) - z * se_log_or log_ci_upper = np.log(or_est) + z * se_log_or return np.exp(log_ci_lower), np.exp(log_ci_upper) def _exact_ci_or(self, confidence: float) -> Tuple[float, float]: """Exact (conditional maximum likelihood) CI for odds ratio.""" # For now, fall back to Woolf's method with correction # Full exact implementation would require more complex computation return self._wald_ci_or(self.odds_ratio().estimate, confidence) def _z_score(self, confidence: float) -> float: """Get z-score for given confidence level.""" from scipy import stats alpha = 1 - confidence return stats.norm.ppf(1 - alpha/2) # UTILITY METHODS
[docs] def to_dict(self) -> Dict[str, int]: """Convert table to dictionary representation.""" return { "a": self.a, "b": self.b, "c": self.c, "d": self.d, "total": self.total }
[docs] def __repr__(self) -> str: return (f"Table2x2(a={self.a}, b={self.b}, c={self.c}, d={self.d})")
[docs] def attributable_fraction_population(self) -> float: """ Attributable fraction in the population (PAF). PAF = Pe * (RR - 1) / (Pe * (RR - 1) + 1) where Pe = proportion of cases that are exposed = a / (a + b). Returns: Population attributable fraction (range: -∞ to 1) """ total_cases = self.total_cases if total_cases == 0: return 0.0 pe = self.a / total_cases # proportion exposed among cases rr = self.risk_ratio().estimate if rr <= 0 or np.isinf(rr): return 0.0 return pe * (rr - 1) / (pe * (rr - 1) + 1)
[docs] def summary(self) -> Dict[str, Union[float, int, Dict]]: """ Generate comprehensive summary of all calculations. Returns: Dictionary with all epidemiological measures """ rr_result = self.risk_ratio() or_result = self.odds_ratio() rd_result = self.risk_difference() chi2_result = self.chi_square() return { "table": self.to_dict(), "risks": { "exposed": self.risk_exposed, "unexposed": self.risk_unexposed }, "odds": { "exposed": self.odds_exposed, "unexposed": self.odds_unexposed }, "risk_ratio": rr_result.to_dict(), "odds_ratio": or_result.to_dict(), "risk_difference": rd_result, "attributable_fractions": { "exposed": self.attributable_fraction_exposed(), "population": self.attributable_fraction_population(), }, "chi_square": chi2_result, "fisher_exact": self.fisher_exact() }
# CONVENIENCE FUNCTIONS
[docs] def risk_ratio(a: int, b: int, c: int, d: int, **kwargs) -> RiskRatioResult: """ Convenience function to calculate risk ratio from raw counts. Args: a, b, c, d: 2x2 table cell counts **kwargs: Passed to Table2x2.risk_ratio() Returns: RiskRatioResult object Example: >>> result = risk_ratio(10, 20, 30, 40) >>> print(result.estimate) """ table = Table2x2(a, b, c, d) return table.risk_ratio(**kwargs)
[docs] def odds_ratio(a: int, b: int, c: int, d: int, **kwargs) -> OddsRatioResult: """ Convenience function to calculate odds ratio from raw counts. Args: a, b, c, d: 2x2 table cell counts **kwargs: Passed to Table2x2.odds_ratio() Returns: OddsRatioResult object """ table = Table2x2(a, b, c, d) return table.odds_ratio(**kwargs)
[docs] def from_dataframe(df, exposed_col: str, outcome_col: str) -> Table2x2: """ Create Table2x2 from pandas DataFrame. Args: df: pandas DataFrame exposed_col: Column name for exposure status (True/False or 1/0) outcome_col: Column name for outcome status (True/False or 1/0) Returns: Table2x2 object """ # Ensure boolean-like columns df = df.copy() for col in [exposed_col, outcome_col]: if df[col].dtype != bool: df[col] = df[col].astype(bool) # Calculate 2x2 counts a = df[df[exposed_col] & df[outcome_col]].shape[0] # Exposed cases b = df[~df[exposed_col] & df[outcome_col]].shape[0] # Unexposed cases c = df[df[exposed_col] & ~df[outcome_col]].shape[0] # Exposed non-cases d = df[~df[exposed_col] & ~df[outcome_col]].shape[0] # Unexposed non-cases return Table2x2(a, b, c, d)
# MODULE EXPORTS __all__ = [ 'Table2x2', 'RiskRatioResult', 'OddsRatioResult', 'ConfidenceMethod', 'risk_ratio', 'odds_ratio', 'from_dataframe' ]