Source code for episia.stats.time_series

"""
This module provides functions for analyzing temporal patterns in
epidemiological data, including epidemic curves, incidence rates,
and temporal trend analysis.
"""

import numpy as np
from typing import Union, Tuple, Optional, Dict, List
from dataclasses import dataclass
from enum import Enum
import warnings
from scipy import stats, signal
import pandas as pd


[docs] class TimeAggregation(Enum): """Time aggregation methods.""" DAILY = "daily" WEEKLY = "weekly" MONTHLY = "monthly" YEARLY = "yearly"
[docs] class TrendMethod(Enum): """Methods for trend analysis.""" LINEAR = "linear" LOESS = "loess" SPLINE = "spline" MOVING_AVERAGE = "moving_average"
[docs] @dataclass class EpidemicCurve: """Container for epidemic curve data.""" dates: np.ndarray counts: np.ndarray aggregated: bool aggregation: Optional[str] = None metadata: Optional[Dict] = None
[docs] def __post_init__(self): """Validate data.""" if len(self.dates) != len(self.counts): raise ValueError("Dates and counts must have same length") if self.metadata is None: self.metadata = {}
[docs] def to_dataframe(self) -> pd.DataFrame: """Convert to pandas DataFrame.""" return pd.DataFrame({ 'date': self.dates, 'count': self.counts })
[docs] def summary(self) -> Dict: """Calculate summary statistics.""" return { "total_cases": int(np.sum(self.counts)), "mean_daily": float(np.mean(self.counts)), "max_daily": int(np.max(self.counts)), "peak_date": self.dates[np.argmax(self.counts)], "duration_days": len(self.counts), "start_date": self.dates[0], "end_date": self.dates[-1] }
[docs] @dataclass class TimeSeriesResult: """Result object for time series analysis.""" dates: np.ndarray observed: np.ndarray predicted: Optional[np.ndarray] = None residuals: Optional[np.ndarray] = None trend: Optional[np.ndarray] = None seasonality: Optional[np.ndarray] = None method: Optional[str] = None metrics: Optional[Dict] = None
[docs] def __repr__(self) -> str: if self.metrics: return f"TimeSeries (R²={self.metrics.get('r_squared', 0):.3f})" return "TimeSeriesResult"
[docs] def plot_data(self) -> Dict: """Prepare data for plotting.""" data = { "dates": self.dates, "observed": self.observed, "trend": self.trend } if self.predicted is not None: data["predicted"] = self.predicted return data
[docs] def calculate_incidence( cases: np.ndarray, population: Union[float, np.ndarray], time_period: float = 1.0 ) -> np.ndarray: """ Calculate incidence rates. Args: cases: Number of cases population: Population at risk (scalar or array) time_period: Time period for rate (default: 1 unit) Returns: Incidence rates per time period Example: >>> calculate_incidence([10, 20, 30], 1000) array([0.01, 0.02, 0.03]) # per time unit """ cases = np.asarray(cases, dtype=float) population = np.asarray(population, dtype=float) if np.any(population <= 0): raise ValueError("Population must be positive") incidence = cases / population * time_period return incidence
[docs] def calculate_attack_rate( cases: np.ndarray, population: Union[float, np.ndarray] ) -> np.ndarray: """ Calculate attack rates (cumulative incidence). Args: cases: Cumulative cases over time population: Population at risk Returns: Attack rates (proportion) """ cases = np.asarray(cases, dtype=float) population = np.asarray(population, dtype=float) if np.any(population <= 0): raise ValueError("Population must be positive") attack_rate = cases / population # Ensure values are between 0 and 1 attack_rate = np.clip(attack_rate, 0, 1) return attack_rate
[docs] def epidemic_curve( dates: np.ndarray, counts: np.ndarray, aggregation: TimeAggregation = TimeAggregation.DAILY, fill_missing: bool = True ) -> EpidemicCurve: """ Create epidemic curve from date-case data. Args: dates: Array of dates counts: Array of case counts aggregation: Time aggregation level fill_missing: Fill missing dates with zeros Returns: EpidemicCurve object """ # Convert to pandas Series for easy manipulation if not isinstance(dates, pd.DatetimeIndex): dates = pd.to_datetime(dates) series = pd.Series(counts, index=dates) # Resample based on aggregation if aggregation == TimeAggregation.WEEKLY: resampled = series.resample('W').sum() agg_label = "weekly" elif aggregation == TimeAggregation.MONTHLY: resampled = series.resample('M').sum() agg_label = "monthly" elif aggregation == TimeAggregation.YEARLY: resampled = series.resample('Y').sum() agg_label = "yearly" else: # DAILY if fill_missing: # Create complete date range full_range = pd.date_range(start=series.index.min(), end=series.index.max(), freq='D') resampled = series.reindex(full_range, fill_value=0) else: resampled = series agg_label = "daily" return EpidemicCurve( dates=resampled.index.values, counts=resampled.values, aggregated=aggregation != TimeAggregation.DAILY, aggregation=agg_label )
[docs] def moving_average( data: np.ndarray, window: int = 7, center: bool = True ) -> np.ndarray: """ Calculate moving average for smoothing time series. Args: data: Time series data window: Window size for moving average center: Whether to center the window Returns: Smoothed time series """ if window <= 0: raise ValueError("Window must be positive") if window > len(data): window = len(data) warnings.warn(f"Window reduced to data length: {window}") # Use pandas for robust handling series = pd.Series(data) smoothed = series.rolling(window=window, center=center, min_periods=1).mean() return smoothed.values
[docs] def loess_smoothing( x: np.ndarray, y: np.ndarray, frac: float = 0.3, iterations: int = 1 ) -> np.ndarray: """ LOESS (Local Regression) smoothing. Args: x: Time points (equally spaced recommended) y: Observations frac: Fraction of data to use for local regression iterations: Number of robustness iterations Returns: Smoothed values """ from statsmodels.nonparametric.smoothers_lowess import lowess if len(x) != len(y): raise ValueError("x and y must have same length") if frac <= 0 or frac > 1: raise ValueError("frac must be between 0 and 1") # LOESS smoothing smoothed = lowess(y, x, frac=frac, it=iterations, return_sorted=False) return smoothed
[docs] def detect_epidemic_threshold( counts: np.ndarray, method: str = "moving_average", window: int = 7, multiplier: float = 2.0 ) -> Dict: """ Detect epidemic threshold using various methods. Args: counts: Daily case counts method: Detection method window: Window size for baseline multiplier: Multiplier for threshold Returns: Dictionary with threshold and flags """ counts = np.asarray(counts) if method == "moving_average": baseline = moving_average(counts, window=window) std = np.std(counts[:window]) if len(counts) >= window else np.std(counts) threshold = baseline + multiplier * std elif method == "percentile": baseline = np.percentile(counts, 75) threshold = baseline * multiplier elif method == "cumu_sum": # Cumulative sum method mean = np.mean(counts) std = np.std(counts) threshold = mean + multiplier * std else: raise ValueError(f"Unknown method: {method}") # Identify epidemic periods epidemic_flags = counts > threshold # Calculate epidemic metrics epidemic_days = np.sum(epidemic_flags) epidemic_periods = _find_contiguous_regions(epidemic_flags) return { "threshold": threshold, "baseline": baseline if method == "moving_average" else None, "epidemic_flags": epidemic_flags, "epidemic_days": epidemic_days, "epidemic_periods": epidemic_periods, "method": method }
def _find_contiguous_regions(flags: np.ndarray) -> List[Tuple[int, int]]: """Find contiguous True regions in boolean array.""" regions = [] start = None for i, flag in enumerate(flags): if flag and start is None: start = i elif not flag and start is not None: regions.append((start, i-1)) start = None if start is not None: regions.append((start, len(flags)-1)) return regions
[docs] def reproductive_number( incidence: np.ndarray, serial_interval: float = 5.0, method: str = "cori" ) -> np.ndarray: """ Estimate time-varying reproductive number (R_t). Args: incidence: Daily incidence serial_interval: Mean serial interval in days method: Estimation method Returns: Estimated R_t over time """ incidence = np.asarray(incidence, dtype=float) if method == "simple": # Simple moving average ratio window = int(serial_interval) if window < 1: window = 1 # Pad with zeros for beginning padded = np.concatenate([np.zeros(window), incidence]) R_t = np.zeros(len(incidence)) for t in range(len(incidence)): numerator = np.sum(padded[t+1:t+window+1]) denominator = np.sum(padded[t-window+1:t+1]) if denominator > 0: R_t[t] = numerator / denominator else: R_t[t] = 0.0 elif method == "cori": # Cori et al. method (simplified) # This is a simplified version R_t = np.zeros(len(incidence)) for t in range(1, len(incidence)): if incidence[t-1] > 0: R_t[t] = incidence[t] / incidence[t-1] else: R_t[t] = 0.0 # Smooth with serial interval R_t = moving_average(R_t, window=int(serial_interval)) else: raise ValueError(f"Unknown method: {method}") return R_t
[docs] def seasonality_decomposition( time_series: np.ndarray, period: int = 365, model: str = "additive" ) -> Dict[str, np.ndarray]: """ Decompose time series into trend, seasonal, and residual components. Args: time_series: Time series data period: Seasonal period (e.g., 365 for daily annual) model: 'additive' or 'multiplicative' Returns: Dictionary with components """ from statsmodels.tsa.seasonal import seasonal_decompose # Ensure enough data points if len(time_series) < 2 * period: warnings.warn(f"Insufficient data for period {period}") period = min(period, len(time_series) // 2) # Create time index if isinstance(time_series, pd.Series): series = time_series else: series = pd.Series(time_series) # Perform decomposition result = seasonal_decompose(series, model=model, period=period, extrapolate_trend='freq') return { "observed": result.observed.values, "trend": result.trend.values, "seasonal": result.seasonal.values, "residual": result.resid.values, "period": period, "model": model }
[docs] def exponential_growth_rate( cases: np.ndarray, time_points: Optional[np.ndarray] = None ) -> Dict[str, float]: """ Calculate exponential growth rate from case counts. Args: cases: Case counts over time time_points: Time points (optional) Returns: Dictionary with growth rate and doubling time """ cases = np.asarray(cases, dtype=float) # Remove zeros for log transformation valid_mask = cases > 0 if np.sum(valid_mask) < 2: return {"growth_rate": 0.0, "doubling_time": float('inf'), "r_squared": 0.0} cases_valid = cases[valid_mask] if time_points is None: time_points = np.arange(len(cases))[valid_mask] else: time_points = np.asarray(time_points)[valid_mask] # Linear regression on log cases log_cases = np.log(cases_valid) slope, intercept, r_value, p_value, std_err = stats.linregress(time_points, log_cases) # Growth rate (per time unit) growth_rate = slope # Doubling time if growth_rate > 0: doubling_time = np.log(2) / growth_rate else: doubling_time = float('inf') return { "growth_rate": growth_rate, "doubling_time": doubling_time, "intercept": intercept, "r_squared": r_value**2, "p_value": p_value, "std_err": std_err }
[docs] def nowcasting( reported_cases: np.ndarray, delay_distribution: np.ndarray, method: str = "simple" ) -> np.ndarray: """ Perform nowcasting to estimate true incidence accounting for reporting delays. Args: reported_cases: Cases reported by date of report delay_distribution: Probability distribution of reporting delays method: Nowcasting method Returns: Nowcasted incidence """ reported_cases = np.asarray(reported_cases, dtype=float) delay_distribution = np.asarray(delay_distribution, dtype=float) # Normalize delay distribution delay_distribution = delay_distribution / np.sum(delay_distribution) n_days = len(reported_cases) n_delays = len(delay_distribution) if method == "simple": # Simple deconvolution nowcasted = np.zeros(n_days) for t in range(n_days): total = 0 for d in range(min(n_delays, t + 1)): total += reported_cases[t - d] * delay_distribution[d] nowcasted[t] = total else: raise ValueError(f"Unknown method: {method}") return nowcasted
[docs] def cumulative_curve( daily_cases: np.ndarray ) -> np.ndarray: """ Calculate cumulative epidemic curve. Args: daily_cases: Daily new cases Returns: Cumulative cases """ return np.cumsum(daily_cases)
[docs] def detect_peaks( time_series: np.ndarray, height: Optional[float] = None, distance: int = 7, prominence: Optional[float] = None ) -> Dict: """ Detect peaks in time series data. Args: time_series: Time series data height: Minimum height of peaks distance: Minimum distance between peaks prominence: Minimum prominence of peaks Returns: Dictionary with peak indices and properties """ peaks, properties = signal.find_peaks( time_series, height=height, distance=distance, prominence=prominence ) return { "peak_indices": peaks, "peak_heights": properties.get('peak_heights', []), "prominences": properties.get('prominences', []), "left_bases": properties.get('left_bases', []), "right_bases": properties.get('right_bases', []), "n_peaks": len(peaks) }
# MODULE EXPORTS __all__ = [ 'TimeAggregation', 'TrendMethod', 'EpidemicCurve', 'TimeSeriesResult', 'calculate_incidence', 'calculate_attack_rate', 'epidemic_curve', 'moving_average', 'loess_smoothing', 'detect_epidemic_threshold', 'reproductive_number', 'seasonality_decomposition', 'exponential_growth_rate', 'nowcasting', 'cumulative_curve', 'detect_peaks' ]