surveillance Module#
Epidemiological surveillance data utilities.
This module provides tools for ingesting, cleaning, aggregating, and alerting on routine surveillance data, designed for public health contexts.
Classes#
- class episia.data.surveillance.SurveillanceDataset(df, *, date_col='date', cases_col='cases', deaths_col=None, district_col=None, disease_col=None, population_col=None)[source]#
Bases:
objectStructured surveillance case count dataset.
- Wraps a pandas DataFrame with columns:
date / week / period time axis district / site spatial unit (optional) disease disease or syndrome name cases integer case count deaths integer death count (optional) population population at risk (optional)
Built from CSV, DHIS2 exports, or a plain DataFrame.
Example:
from episia.data.surveillance import SurveillanceDataset ds = SurveillanceDataset.from_csv("meningite_2024.csv", date_col="semaine", cases_col="cas") print(ds.summary()) ds.epicurve().plot().show() alerts = ds.alert_engine().run()
- Parameters:
- __init__(df, *, date_col='date', cases_col='cases', deaths_col=None, district_col=None, disease_col=None, population_col=None)[source]#
- aggregate(freq='W', group_by=None)[source]#
Aggregate cases by time frequency and optional grouping columns.
- property df#
- endemic_channel(historical_years=None, percentiles=(25, 50, 75))[source]#
Compute the endemic channel (historical percentile envelope).
Groups by ISO week number across historical years. Returns the percentile bands used for alert zone classification.
- filter_date(start=None, end=None)[source]#
Filter to a date range (inclusive).
- Parameters:
- Return type:
- filter_disease(disease)[source]#
Return a new dataset filtered to a single disease.
- Parameters:
disease (str)
- Return type:
- filter_district(district)[source]#
Return a new dataset filtered to a single district.
- Parameters:
district (str)
- Return type:
- classmethod from_csv(path, date_col='date', cases_col='cases', deaths_col=None, district_col=None, disease_col=None, population_col=None, **read_kwargs)[source]#
Load from CSV file.
- Parameters:
date_col (str) – Column name for date / week.
cases_col (str) – Column name for case counts.
deaths_col (str | None) – Column name for deaths (optional).
district_col (str | None) – Column name for district / site (optional).
disease_col (str | None) – Column name for disease / syndrome (optional).
population_col (str | None) – Column for population at risk (optional).
**read_kwargs – Passed to pd.read_csv.
- Returns:
SurveillanceDataset.
- Return type:
- classmethod from_dict(data, **kwargs)[source]#
Create from a plain dict of lists.
- Parameters:
- Return type:
- class episia.data.surveillance.AlertEngine(dataset)[source]#
Bases:
objectThreshold-based and statistical alert detection for surveillance data.
Example:
engine = AlertEngine(dataset) alerts = engine.run( threshold=10, zscore_threshold=2.0, use_endemic_channel=True, ) for a in alerts: print(a.period, a.severity, a.message)
- Parameters:
dataset (SurveillanceDataset)
- __init__(dataset)[source]#
- Parameters:
dataset (SurveillanceDataset)
- run(threshold=None, zscore_threshold=2.0, use_endemic_channel=False, historical_years=None, freq='W')[source]#
Run all enabled alert detectors.
- Parameters:
threshold (float | None) – Absolute case count threshold.
zscore_threshold (float) – Z-score threshold for statistical alert.
use_endemic_channel (bool) – Use endemic channel (requires ≥3 historical years).
historical_years (List[int] | None) – Years to use for endemic channel baseline.
freq (str) – Aggregation frequency (‘D’, ‘W’, ‘ME’).
- Returns:
List of Alert objects, sorted by period.
- Return type:
Functions#
- episia.data.surveillance.from_dhis2_csv(path, date_col='periodName', cases_col='value', district_col='orgUnitName', **kwargs)[source]#
Load a DHIS2 standard CSV export.
- DHIS2 exports typically have columns:
periodName, orgUnitName, dataElementName, value, …
- Parameters:
- Returns:
SurveillanceDataset.
- Return type:
- episia.data.surveillance.compute_attack_rate(cases, population, per=100000)[source]#
Compute attack rate.
Examples#
Creating a surveillance dataset:
from episia.data.surveillance import SurveillanceDataset
# From CSV
ds = SurveillanceDataset.from_csv(
"meningite_2024.csv",
date_col="semaine",
cases_col="cas",
district_col="district",
disease_col="maladie"
)
# Basic information
print(f"Total cases: {ds.total_cases}")
print(f"Date range: {ds.date_range}")
print(f"Districts: {ds.districts}")
Data aggregation:
# Aggregate weekly
weekly = ds.aggregate(freq="W")
# Aggregate by district and week
stratified = ds.aggregate(freq="W", group_by=["district"])
Epidemiological metrics:
# Attack rate
ar = ds.attack_rate(population=1000000, per=100000)
# Weekly attack rates
weekly_ar = ds.weekly_attack_rates(population=1000000)
# Endemic channel
channel = ds.endemic_channel(historical_years=[2020, 2021, 2022])
print(f"P75 threshold: {channel['p_high']}")
Alert detection:
# Create alert engine
engine = AlertEngine(ds)
# Run alerts
alerts = engine.run(
threshold=50,
zscore_threshold=2.0,
use_endemic_channel=True
)
for alert in alerts:
print(f"{alert.period}: {alert.severity} - {alert.message}")
# Alert summary
summary = engine.alert_summary(alerts)
print(f"Alerts by severity: {summary['severity_counts']}")
DHIS2 integration:
from episia.data.surveillance import from_dhis2_csv
# Load DHIS2 export
ds = from_dhis2_csv("dhis2_export.csv")