time_series Module#

Temporal analysis of epidemiological data.

This module provides functions for analyzing temporal patterns in epidemiological data, including epidemic curves, incidence rates, and temporal trend analysis.

Classes#

class episia.stats.time_series.TimeAggregation(value)[source]#

Bases: Enum

Time aggregation methods.

DAILY = 'daily'#
MONTHLY = 'monthly'#
WEEKLY = 'weekly'#
YEARLY = 'yearly'#
class episia.stats.time_series.TrendMethod(value)[source]#

Bases: Enum

Methods for trend analysis.

LINEAR = 'linear'#
LOESS = 'loess'#
MOVING_AVERAGE = 'moving_average'#
SPLINE = 'spline'#
class episia.stats.time_series.EpidemicCurve(dates, counts, aggregated, aggregation=None, metadata=None)[source]#

Bases: object

Container for epidemic curve data.

Parameters:
__init__(dates, counts, aggregated, aggregation=None, metadata=None)#
Parameters:
Return type:

None

__post_init__()[source]#

Validate data.

aggregated: bool#
aggregation: str | None = None#
counts: ndarray#
dates: ndarray#
metadata: Dict | None = None#
summary()[source]#

Calculate summary statistics.

Return type:

Dict

to_dataframe()[source]#

Convert to pandas DataFrame.

Return type:

DataFrame

class episia.stats.time_series.TimeSeriesResult(dates, observed, predicted=None, residuals=None, trend=None, seasonality=None, method=None, metrics=None)[source]#

Bases: object

Result object for time series analysis.

Parameters:
__repr__()[source]#

Return repr(self).

Return type:

str

dates: ndarray#
method: str | None = None#
metrics: Dict | None = None#
observed: ndarray#
plot_data()[source]#

Prepare data for plotting.

Return type:

Dict

predicted: ndarray | None = None#
residuals: ndarray | None = None#
seasonality: ndarray | None = None#
trend: ndarray | None = None#

Functions#

episia.stats.time_series.calculate_incidence(cases, population, time_period=1.0)[source]#

Calculate incidence rates.

Parameters:
  • cases (ndarray) – Number of cases

  • population (float | ndarray) – Population at risk (scalar or array)

  • time_period (float) – Time period for rate (default: 1 unit)

Returns:

Incidence rates per time period

Return type:

ndarray

Example

>>> calculate_incidence([10, 20, 30], 1000)
array([0.01, 0.02, 0.03])  # per time unit
episia.stats.time_series.calculate_attack_rate(cases, population)[source]#

Calculate attack rates (cumulative incidence).

Parameters:
  • cases (ndarray) – Cumulative cases over time

  • population (float | ndarray) – Population at risk

Returns:

Attack rates (proportion)

Return type:

ndarray

episia.stats.time_series.epidemic_curve(dates, counts, aggregation=TimeAggregation.DAILY, fill_missing=True)[source]#

Create epidemic curve from date-case data.

Parameters:
  • dates (ndarray) – Array of dates

  • counts (ndarray) – Array of case counts

  • aggregation (TimeAggregation) – Time aggregation level

  • fill_missing (bool) – Fill missing dates with zeros

Returns:

EpidemicCurve object

Return type:

EpidemicCurve

episia.stats.time_series.moving_average(data, window=7, center=True)[source]#

Calculate moving average for smoothing time series.

Parameters:
  • data (ndarray) – Time series data

  • window (int) – Window size for moving average

  • center (bool) – Whether to center the window

Returns:

Smoothed time series

Return type:

ndarray

episia.stats.time_series.loess_smoothing(x, y, frac=0.3, iterations=1)[source]#

LOESS (Local Regression) smoothing.

Parameters:
  • x (ndarray) – Time points (equally spaced recommended)

  • y (ndarray) – Observations

  • frac (float) – Fraction of data to use for local regression

  • iterations (int) – Number of robustness iterations

Returns:

Smoothed values

Return type:

ndarray

episia.stats.time_series.detect_epidemic_threshold(counts, method='moving_average', window=7, multiplier=2.0)[source]#

Detect epidemic threshold using various methods.

Parameters:
  • counts (ndarray) – Daily case counts

  • method (str) – Detection method

  • window (int) – Window size for baseline

  • multiplier (float) – Multiplier for threshold

Returns:

Dictionary with threshold and flags

Return type:

Dict

episia.stats.time_series.reproductive_number(incidence, serial_interval=5.0, method='cori')[source]#

Estimate time-varying reproductive number (R_t).

Parameters:
  • incidence (ndarray) – Daily incidence

  • serial_interval (float) – Mean serial interval in days

  • method (str) – Estimation method

Returns:

Estimated R_t over time

Return type:

ndarray

episia.stats.time_series.seasonality_decomposition(time_series, period=365, model='additive')[source]#

Decompose time series into trend, seasonal, and residual components.

Parameters:
  • time_series (ndarray) – Time series data

  • period (int) – Seasonal period (e.g., 365 for daily annual)

  • model (str) – ‘additive’ or ‘multiplicative’

Returns:

Dictionary with components

Return type:

Dict[str, ndarray]

episia.stats.time_series.exponential_growth_rate(cases, time_points=None)[source]#

Calculate exponential growth rate from case counts.

Parameters:
  • cases (ndarray) – Case counts over time

  • time_points (ndarray | None) – Time points (optional)

Returns:

Dictionary with growth rate and doubling time

Return type:

Dict[str, float]

episia.stats.time_series.nowcasting(reported_cases, delay_distribution, method='simple')[source]#

Perform nowcasting to estimate true incidence accounting for reporting delays.

Parameters:
  • reported_cases (ndarray) – Cases reported by date of report

  • delay_distribution (ndarray) – Probability distribution of reporting delays

  • method (str) – Nowcasting method

Returns:

Nowcasted incidence

Return type:

ndarray

episia.stats.time_series.cumulative_curve(daily_cases)[source]#

Calculate cumulative epidemic curve.

Parameters:

daily_cases (ndarray) – Daily new cases

Returns:

Cumulative cases

Return type:

ndarray

episia.stats.time_series.detect_peaks(time_series, height=None, distance=7, prominence=None)[source]#

Detect peaks in time series data.

Parameters:
  • time_series (ndarray) – Time series data

  • height (float | None) – Minimum height of peaks

  • distance (int) – Minimum distance between peaks

  • prominence (float | None) – Minimum prominence of peaks

Returns:

Dictionary with peak indices and properties

Return type:

Dict

Examples#

Creating an epidemic curve:

import pandas as pd
from episia.stats.time_series import epidemic_curve, TimeAggregation

dates = pd.date_range('2024-01-01', periods=100, freq='D')
cases = np.random.poisson(lam=5, size=100)

curve = epidemic_curve(dates, cases, aggregation=TimeAggregation.WEEKLY)
print(curve.summary())

# Convert to DataFrame for analysis
df = curve.to_dataframe()

Moving average smoothing:

from episia.stats.time_series import moving_average

smoothed = moving_average(cases, window=7)

# 7-day moving average
for i, (orig, smooth) in enumerate(zip(cases[:10], smoothed[:10])):
    print(f"Day {i+1}: {orig}{smooth:.1f}")

Detecting epidemic threshold:

from episia.stats.time_series import detect_epidemic_threshold

result = detect_epidemic_threshold(
    cases,
    method='moving_average',
    window=7,
    multiplier=2.0
)

print(f"Threshold: {result['threshold']:.1f}")
print(f"Epidemic days: {result['epidemic_days']}")

Time-varying reproductive number:

from episia.stats.time_series import reproductive_number

Rt = reproductive_number(cases, serial_interval=5.0, method='cori')
for day, rt in enumerate(Rt[:30]):
    if rt > 0:
        print(f"Day {day+1}: Rt={rt:.2f}")

Exponential growth rate:

from episia.stats.time_series import exponential_growth_rate

growth = exponential_growth_rate(cases[:30])
print(f"Growth rate: {growth['growth_rate']:.3f} per day")
print(f"Doubling time: {growth['doubling_time']:.1f} days")
print(f"R²: {growth['r_squared']:.3f}")

Seasonality decomposition:

from episia.stats.time_series import seasonality_decomposition

# Annual data with weekly cases
weekly_cases = np.random.poisson(lam=10, size=104)  # 2 years
decomp = seasonality_decomposition(weekly_cases, period=52, model='additive')

# Access components
trend = decomp['trend']
seasonal = decomp['seasonal']
residual = decomp['residual']

Peak detection:

from episia.stats.time_series import detect_peaks

peaks = detect_peaks(cases, distance=7, prominence=10)
print(f"Found {peaks['n_peaks']} peaks at indices: {peaks['peak_indices']}")