Source code for episia.stats.regression

"""
This module provides functions for fitting and interpreting
regression models commonly used in epidemiology, including
logistic regression for binary outcomes and Poisson regression
for count data.
"""

import math
import numpy as np
from typing import Tuple, Optional, Dict, List
from dataclasses import dataclass
from enum import Enum
from scipy import stats
import pandas as pd


[docs] class RegressionType(Enum): """Types of regression models.""" LOGISTIC = "logistic" POISSON = "poisson" LINEAR = "linear"
[docs] class ModelSelection(Enum): """Model selection criteria.""" AIC = "aic" BIC = "bic" LIKELIHOOD_RATIO = "likelihood_ratio"
[docs] @dataclass class RegressionResult: """Result object for regression analysis.""" coefficients: np.ndarray odds_ratios: np.ndarray ci_lower: np.ndarray ci_upper: np.ndarray p_values: np.ndarray variable_names: List[str] model_type: str n_observations: int log_likelihood: float aic: float bic: float convergence: bool iterations: int
[docs] def __repr__(self) -> str: return f"RegressionResult({self.model_type}, n={self.n_observations}, AIC={self.aic:.1f})"
[docs] def summary(self, decimal_places: int = 3) -> pd.DataFrame: """ Create summary table of regression results. Args: decimal_places: Number of decimal places for display Returns: pandas DataFrame with results """ data = [] for i, var in enumerate(self.variable_names): if self.model_type == "logistic": measure = self.odds_ratios[i] ci_str = f"{self.ci_lower[i]:.{decimal_places}f}-{self.ci_upper[i]:.{decimal_places}f}" else: measure = self.coefficients[i] ci_str = f"{self.ci_lower[i]:.{decimal_places}f}-{self.ci_upper[i]:.{decimal_places}f}" data.append({ "Variable": var, "Coefficient": round(self.coefficients[i], decimal_places), "OR" if self.model_type == "logistic" else "Coefficient": round(measure, decimal_places), "95% CI": ci_str, "p-value": f"{self.p_values[i]:.{decimal_places}f}" if self.p_values[i] >= 0.001 else "<0.001" }) df = pd.DataFrame(data) # Add model statistics stats_df = pd.DataFrame([{ "Variable": "Model Statistics", "Coefficient": f"n={self.n_observations}", "OR" if self.model_type == "logistic" else "Coefficient": f"LL={self.log_likelihood:.1f}", "95% CI": f"AIC={self.aic:.1f}", "p-value": f"BIC={self.bic:.1f}" }]) return pd.concat([df, stats_df], ignore_index=True)
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """ Predict probabilities or expected counts. Args: X: Design matrix (including intercept if needed) Returns: Predicted values """ if self.model_type == "logistic": linear_predictor = X @ self.coefficients return 1 / (1 + np.exp(-linear_predictor)) else: return np.exp(X @ self.coefficients)
[docs] def logistic_regression( X: np.ndarray, y: np.ndarray, variable_names: Optional[List[str]] = None, add_intercept: bool = True, method: str = "irls", max_iter: int = 100, tol: float = 1e-6 ) -> RegressionResult: """ Fit logistic regression model for binary outcomes. Args: X: Design matrix (n_samples, n_features) y: Binary outcome (0 or 1) variable_names: Names of predictor variables add_intercept: Whether to add intercept term method: Fitting method ('irls' or 'newton') max_iter: Maximum iterations tol: Convergence tolerance Returns: RegressionResult object Example: >>> X = np.array([[1, 25], [1, 30], [1, 35], [0, 40]]) >>> y = np.array([1, 1, 0, 0]) >>> result = logistic_regression(X, y, ['exposed', 'age']) """ # Input validation X = np.asarray(X, dtype=float) y = np.asarray(y, dtype=float) if X.shape[0] != len(y): raise ValueError("X and y must have same number of observations") if not np.all(np.isin(y, [0, 1])): raise ValueError("y must contain only 0 and 1 values") # Add intercept if requested if add_intercept: X = np.column_stack([np.ones(X.shape[0]), X]) if variable_names is not None: variable_names = ["Intercept"] + variable_names else: variable_names = ["Intercept"] + [f"X{i}" for i in range(X.shape[1] - 1)] elif variable_names is None: variable_names = [f"X{i}" for i in range(X.shape[1])] n_obs, n_vars = X.shape # Initialize coefficients beta = np.zeros(n_vars) if method == "irls": beta, converged, iterations, log_likelihood = _irls_logistic(X, y, beta, max_iter, tol) elif method == "newton": beta, converged, iterations, log_likelihood = _newton_logistic(X, y, beta, max_iter, tol) else: raise ValueError(f"Unknown method: {method}") # Calculate standard errors and confidence intervals se, p_values = _logistic_standard_errors(X, y, beta) # Odds ratios and confidence intervals odds_ratios = np.exp(beta) z = stats.norm.ppf(0.975) ci_lower = np.exp(beta - z * se) ci_upper = np.exp(beta + z * se) # Model fit statistics aic = -2 * log_likelihood + 2 * n_vars bic = -2 * log_likelihood + n_vars * np.log(n_obs) return RegressionResult( coefficients=beta, odds_ratios=odds_ratios, ci_lower=ci_lower, ci_upper=ci_upper, p_values=p_values, variable_names=variable_names, model_type="logistic", n_observations=n_obs, log_likelihood=log_likelihood, aic=aic, bic=bic, convergence=converged, iterations=iterations )
def _irls_logistic( X: np.ndarray, y: np.ndarray, beta_init: np.ndarray, max_iter: int, tol: float ) -> Tuple[np.ndarray, bool, int, float]: """ Iteratively Reweighted Least Squares for logistic regression. """ beta = beta_init.copy() n_obs = X.shape[0] for iteration in range(max_iter): # Current predictions linear_predictor = X @ beta p = 1 / (1 + np.exp(-linear_predictor)) # Avoid extreme probabilities p = np.clip(p, 1e-10, 1 - 1e-10) # Weights and working response W = np.diag(p * (1 - p)) z = linear_predictor + (y - p) / (p * (1 - p)) # Update beta XTW = X.T @ W XTWX = XTW @ X XTWz = XTW @ z try: beta_new = np.linalg.solve(XTWX, XTWz) except np.linalg.LinAlgError: # Use pseudo-inverse if singular beta_new = np.linalg.pinv(XTWX) @ XTWz # Check convergence if np.max(np.abs(beta_new - beta)) < tol: beta = beta_new break beta = beta_new # Calculate log-likelihood linear_predictor = X @ beta p = 1 / (1 + np.exp(-linear_predictor)) p = np.clip(p, 1e-10, 1 - 1e-10) log_likelihood = np.sum(y * np.log(p) + (1 - y) * np.log(1 - p)) converged = iteration < max_iter - 1 return beta, converged, iteration + 1, log_likelihood def _newton_logistic( X: np.ndarray, y: np.ndarray, beta_init: np.ndarray, max_iter: int, tol: float ) -> Tuple[np.ndarray, bool, int, float]: """ Newton-Raphson method for logistic regression. """ beta = beta_init.copy() for iteration in range(max_iter): linear_predictor = X @ beta p = 1 / (1 + np.exp(-linear_predictor)) p = np.clip(p, 1e-10, 1 - 1e-10) # Gradient gradient = X.T @ (y - p) # Hessian W = np.diag(p * (1 - p)) hessian = -X.T @ W @ X try: # Newton update delta = np.linalg.solve(-hessian, gradient) except np.linalg.LinAlgError: # Use gradient descent if Hessian is singular delta = 0.01 * gradient beta_new = beta - delta # Check convergence if np.max(np.abs(beta_new - beta)) < tol: beta = beta_new break beta = beta_new # Log-likelihood linear_predictor = X @ beta p = 1 / (1 + np.exp(-linear_predictor)) p = np.clip(p, 1e-10, 1 - 1e-10) log_likelihood = np.sum(y * np.log(p) + (1 - y) * np.log(1 - p)) converged = iteration < max_iter - 1 return beta, converged, iteration + 1, log_likelihood def _logistic_standard_errors( X: np.ndarray, y: np.ndarray, beta: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Calculate standard errors and p-values for logistic regression. """ n_obs = X.shape[0] # Predictions at final beta linear_predictor = X @ beta p = 1 / (1 + np.exp(-linear_predictor)) p = np.clip(p, 1e-10, 1 - 1e-10) # Fisher information matrix W = np.diag(p * (1 - p)) information = X.T @ W @ X try: # Variance-covariance matrix vcov = np.linalg.inv(information) except np.linalg.LinAlgError: # Use pseudo-inverse if singular vcov = np.linalg.pinv(information) # Standard errors — clip negative variances from numerical noise se = np.sqrt(np.maximum(np.diag(vcov), 0.0)) # Wald test statistics — guard against SE=0 (degenerate cases) with np.errstate(divide="ignore", invalid="ignore"): z_scores = np.where(se > 0, beta / se, 0.0) # Two-sided p-values p_values = 2 * (1 - stats.norm.cdf(np.abs(z_scores))) return se, p_values
[docs] def poisson_regression( X: np.ndarray, y: np.ndarray, offset: Optional[np.ndarray] = None, variable_names: Optional[List[str]] = None, add_intercept: bool = True, max_iter: int = 100, tol: float = 1e-6 ) -> RegressionResult: """ Fit Poisson regression model for count data. Args: X: Design matrix y: Count outcome (non-negative integers) offset: Offset term (e.g., log(person-time)) variable_names: Names of predictor variables add_intercept: Whether to add intercept term max_iter: Maximum iterations tol: Convergence tolerance Returns: RegressionResult object """ X = np.asarray(X, dtype=float) y = np.asarray(y, dtype=float) if np.any(y < 0): raise ValueError("y must contain non-negative values") if offset is not None: offset = np.asarray(offset, dtype=float) if len(offset) != len(y): raise ValueError("offset must have same length as y") else: offset = np.zeros(len(y)) # Add intercept if add_intercept: X = np.column_stack([np.ones(X.shape[0]), X]) if variable_names is not None: variable_names = ["Intercept"] + variable_names else: variable_names = ["Intercept"] + [f"X{i}" for i in range(X.shape[1] - 1)] elif variable_names is None: variable_names = [f"X{i}" for i in range(X.shape[1])] n_obs, n_vars = X.shape # Initialize coefficients beta = np.zeros(n_vars) # Fit using Newton-Raphson for iteration in range(max_iter): linear_predictor = X @ beta + offset mu = np.exp(linear_predictor) # Avoid extreme values mu = np.clip(mu, 1e-10, 1e10) # Gradient gradient = X.T @ (y - mu) # Hessian W = np.diag(mu) hessian = -X.T @ W @ X try: delta = np.linalg.solve(-hessian, gradient) except np.linalg.LinAlgError: delta = 0.01 * gradient beta_new = beta + delta if np.max(np.abs(beta_new - beta)) < tol: beta = beta_new break beta = beta_new # Calculate log-likelihood linear_predictor = X @ beta + offset mu = np.exp(linear_predictor) mu = np.clip(mu, 1e-10, 1e10) # math.lgamma(n+1) = log(n!) — vectorized, compatible with all NumPy versions log_factorial = np.array([math.lgamma(int(yi) + 1) for yi in y]) log_likelihood = np.sum(y * np.log(mu) - mu - log_factorial) # Standard errors W = np.diag(mu) information = X.T @ W @ X try: vcov = np.linalg.inv(information) except np.linalg.LinAlgError: vcov = np.linalg.pinv(information) se = np.sqrt(np.diag(vcov)) # Rate ratios and confidence intervals rate_ratios = np.exp(beta) z = stats.norm.ppf(0.975) ci_lower = np.exp(beta - z * se) ci_upper = np.exp(beta + z * se) # P-values z_scores = beta / se p_values = 2 * (1 - stats.norm.cdf(np.abs(z_scores))) # Model statistics aic = -2 * log_likelihood + 2 * n_vars bic = -2 * log_likelihood + n_vars * np.log(n_obs) return RegressionResult( coefficients=beta, odds_ratios=rate_ratios, # Actually rate ratios for Poisson ci_lower=ci_lower, ci_upper=ci_upper, p_values=p_values, variable_names=variable_names, model_type="poisson", n_observations=n_obs, log_likelihood=log_likelihood, aic=aic, bic=bic, convergence=iteration < max_iter - 1, iterations=iteration + 1 )
[docs] def likelihood_ratio_test( model_full: RegressionResult, model_reduced: RegressionResult ) -> Dict[str, float]: """ Perform likelihood ratio test between nested models. Args: model_full: Full model (more parameters) model_reduced: Reduced model (fewer parameters) Returns: Dictionary with test statistics """ if model_full.model_type != model_reduced.model_type: raise ValueError("Models must be of same type") if model_full.n_observations != model_reduced.n_observations: raise ValueError("Models must be fit on same data") # Test statistic lr_stat = 2 * (model_full.log_likelihood - model_reduced.log_likelihood) # Degrees of freedom df = len(model_full.coefficients) - len(model_reduced.coefficients) # P-value p_value = 1 - stats.chi2.cdf(lr_stat, df) if df > 0 else 1.0 return { "lr_statistic": lr_stat, "df": df, "p_value": p_value, "full_aic": model_full.aic, "reduced_aic": model_reduced.aic, "full_bic": model_full.bic, "reduced_bic": model_reduced.bic }
[docs] def hosmer_lemeshow_test( y_true: np.ndarray, y_pred: np.ndarray, n_groups: int = 10 ) -> Dict[str, float]: """ Hosmer-Lemeshow goodness-of-fit test for logistic regression. Args: y_true: True binary outcomes y_pred: Predicted probabilities n_groups: Number of groups to form Returns: Dictionary with test statistics """ # Sort by predicted probability sort_idx = np.argsort(y_pred) y_true_sorted = y_true[sort_idx] y_pred_sorted = y_pred[sort_idx] # Create groups group_size = len(y_true) // n_groups chi2 = 0.0 observed_counts = [] expected_counts = [] for g in range(n_groups): start = g * group_size end = start + group_size if g < n_groups - 1 else len(y_true) group_true = y_true_sorted[start:end] group_pred = y_pred_sorted[start:end] observed = np.sum(group_true) expected = np.sum(group_pred) if expected > 0: chi2 += (observed - expected)**2 / (expected * (1 - expected / len(group_true))) observed_counts.append(observed) expected_counts.append(expected) df = n_groups - 2 p_value = 1 - stats.chi2.cdf(chi2, df) if df > 0 else 1.0 return { "chi2": chi2, "df": df, "p_value": p_value, "n_groups": n_groups, "observed": observed_counts, "expected": expected_counts }
[docs] def calculate_vif(X: np.ndarray) -> Dict[str, float]: """ Calculate Variance Inflation Factors for multicollinearity detection. Args: X: Design matrix (without intercept) Returns: Dictionary with VIF for each variable """ X = np.asarray(X, dtype=float) n_vars = X.shape[1] # Add intercept X_with_intercept = np.column_stack([np.ones(X.shape[0]), X]) vif = {} for i in range(1, n_vars + 1): # Skip intercept # Regress variable i on all other variables y = X_with_intercept[:, i] X_other = np.delete(X_with_intercept, i, axis=1) # Solve normal equations try: beta = np.linalg.solve(X_other.T @ X_other, X_other.T @ y) y_pred = X_other @ beta ss_res = np.sum((y - y_pred)**2) ss_tot = np.sum((y - np.mean(y))**2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 except np.linalg.LinAlgError: r2 = 0 vif_value = 1 / (1 - r2) if r2 < 1 else float('inf') vif[f"X{i}"] = vif_value return vif
[docs] def stepwise_selection( X: np.ndarray, y: np.ndarray, model_type: RegressionType = RegressionType.LOGISTIC, direction: str = "both", criterion: ModelSelection = ModelSelection.AIC, max_vars: Optional[int] = None ) -> Dict: """ Perform stepwise variable selection. Args: X: Design matrix y: Outcome model_type: Type of regression model direction: 'forward', 'backward', or 'both' criterion: Selection criterion max_vars: Maximum number of variables to include Returns: Dictionary with selected model and steps """ n_obs, n_vars = X.shape if max_vars is None: max_vars = min(n_vars, n_obs // 10) # Rule of thumb # Variable indices current_vars = set() candidate_vars = set(range(n_vars)) steps = [] best_model = None best_criterion = float('inf') if direction in ["forward", "both"]: # Forward selection while len(current_vars) < max_vars and candidate_vars: best_var = None best_score = float('inf') for var in candidate_vars: test_vars = list(current_vars | {var}) X_test = X[:, test_vars] if model_type == RegressionType.LOGISTIC: model = logistic_regression(X_test, y, add_intercept=True) else: model = poisson_regression(X_test, y, add_intercept=True) score = model.aic if criterion == ModelSelection.AIC else model.bic if score < best_score: best_score = score best_var = var if best_var is not None: current_vars.add(best_var) candidate_vars.remove(best_var) steps.append({ "step": len(steps) + 1, "action": "add", "variable": best_var, "criterion": best_score }) if best_score < best_criterion: best_criterion = best_score best_model = model if direction in ["backward", "both"] and len(current_vars) > 1: # Backward elimination (simplified) pass return { "selected_variables": list(current_vars), "best_model": best_model, "steps": steps, "final_criterion": best_criterion, "direction": direction }
[docs] def roc_auc_from_logistic( model: RegressionResult, X: np.ndarray, y: np.ndarray ) -> float: """ Calculate ROC AUC from logistic regression model. Args: model: Fitted logistic regression model X: Design matrix (with intercept if model has it) y: True outcomes Returns: AUC value """ # Predict probabilities y_pred = model.predict(X) # Calculate AUC (simplified) from sklearn.metrics import roc_auc_score try: auc = roc_auc_score(y, y_pred) except: # Fallback calculation n_pos = np.sum(y == 1) n_neg = np.sum(y == 0) if n_pos == 0 or n_neg == 0: return 0.5 # Manual AUC calculation pos_pred = y_pred[y == 1] neg_pred = y_pred[y == 0] auc = 0 for p in pos_pred: auc += np.sum(neg_pred < p) + 0.5 * np.sum(neg_pred == p) auc /= (n_pos * n_neg) return auc
[docs] def interaction_term( X1: np.ndarray, X2: np.ndarray, center: bool = True ) -> np.ndarray: """ Create interaction term for regression. Args: X1: First variable X2: Second variable center: Whether to center variables before multiplying Returns: Interaction term """ X1 = np.asarray(X1, dtype=float) X2 = np.asarray(X2, dtype=float) if center: X1_centered = X1 - np.mean(X1) X2_centered = X2 - np.mean(X2) interaction = X1_centered * X2_centered else: interaction = X1 * X2 return interaction
# MODULE EXPORTS __all__ = [ 'RegressionType', 'ModelSelection', 'RegressionResult', 'logistic_regression', 'poisson_regression', 'likelihood_ratio_test', 'hosmer_lemeshow_test', 'calculate_vif', 'stepwise_selection', 'roc_auc_from_logistic', 'interaction_term' ]