surveillance Module#

Epidemiological surveillance data utilities.

This module provides tools for ingesting, cleaning, aggregating, and alerting on routine surveillance data, designed for public health contexts.

Classes#

class episia.data.surveillance.SurveillanceDataset(df, *, date_col='date', cases_col='cases', deaths_col=None, district_col=None, disease_col=None, population_col=None)[source]#

Bases: object

Structured surveillance case count dataset.

Wraps a pandas DataFrame with columns:

date / week / period time axis district / site spatial unit (optional) disease disease or syndrome name cases integer case count deaths integer death count (optional) population population at risk (optional)

Built from CSV, DHIS2 exports, or a plain DataFrame.

Example:

from episia.data.surveillance import SurveillanceDataset

ds = SurveillanceDataset.from_csv("meningite_2024.csv",
                                   date_col="semaine",
                                   cases_col="cas")
print(ds.summary())
ds.epicurve().plot().show()
alerts = ds.alert_engine().run()
Parameters:
  • date_col (str)

  • cases_col (str)

  • deaths_col (Optional[str])

  • district_col (Optional[str])

  • disease_col (Optional[str])

  • population_col (Optional[str])

__init__(df, *, date_col='date', cases_col='cases', deaths_col=None, district_col=None, disease_col=None, population_col=None)[source]#
Parameters:
  • date_col (str)

  • cases_col (str)

  • deaths_col (str | None)

  • district_col (str | None)

  • disease_col (str | None)

  • population_col (str | None)

aggregate(freq='W', group_by=None)[source]#

Aggregate cases by time frequency and optional grouping columns.

Parameters:
  • freq (str) – Pandas offset alias (‘D’=daily, ‘W’=weekly, ‘ME’=monthly).

  • group_by (List[str] | None) – Additional columns to group by (district, disease…).

Returns:

pandas DataFrame with aggregated counts.

attack_rate(population=None, per=100000)[source]#

Compute overall attack rate.

Parameters:
  • population (int | None) – Population denominator (uses population_col if None).

  • per (int) – Rate denominator (default 100,000).

Returns:

Attack rate per per population.

Return type:

float

property cfr: float | None#

Case fatality rate = total_deaths / total_cases.

property date_range: Tuple[Any, Any]#
property df#
property diseases: List[str]#
property districts: List[str]#
endemic_channel(historical_years=None, percentiles=(25, 50, 75))[source]#

Compute the endemic channel (historical percentile envelope).

Groups by ISO week number across historical years. Returns the percentile bands used for alert zone classification.

Parameters:
  • historical_years (List[int] | None) – Years to include (all years if None).

  • percentiles (Tuple[float, float, float]) – (low, median, high) percentiles.

Returns:

‘weeks’, ‘p_low’, ‘p_mid’, ‘p_high’.

Return type:

Dict with keys

filter_date(start=None, end=None)[source]#

Filter to a date range (inclusive).

Parameters:
  • start (Any | None)

  • end (Any | None)

Return type:

SurveillanceDataset

filter_disease(disease)[source]#

Return a new dataset filtered to a single disease.

Parameters:

disease (str)

Return type:

SurveillanceDataset

filter_district(district)[source]#

Return a new dataset filtered to a single district.

Parameters:

district (str)

Return type:

SurveillanceDataset

classmethod from_csv(path, date_col='date', cases_col='cases', deaths_col=None, district_col=None, disease_col=None, population_col=None, **read_kwargs)[source]#

Load from CSV file.

Parameters:
  • path (str | Path) – Path to CSV file.

  • date_col (str) – Column name for date / week.

  • cases_col (str) – Column name for case counts.

  • deaths_col (str | None) – Column name for deaths (optional).

  • district_col (str | None) – Column name for district / site (optional).

  • disease_col (str | None) – Column name for disease / syndrome (optional).

  • population_col (str | None) – Column for population at risk (optional).

  • **read_kwargs – Passed to pd.read_csv.

Returns:

SurveillanceDataset.

Return type:

SurveillanceDataset

classmethod from_dataframe(df, **kwargs)[source]#

Wrap an existing DataFrame.

Return type:

SurveillanceDataset

classmethod from_dict(data, **kwargs)[source]#

Create from a plain dict of lists.

Parameters:

data (Dict[str, List])

Return type:

SurveillanceDataset

property n_records: int#
summary()[source]#

Return a summary statistics dict.

Return type:

Dict[str, Any]

to_timeseries_result()[source]#

Convert to api.results.TimeSeriesResult for viz integration.

Returns:

TimeSeriesResult ready for plot_epicurve().

property total_cases: int#
property total_deaths: int | None#
weekly_attack_rates(population, per=100000)[source]#

Compute weekly attack rates.

Parameters:
  • population (int) – Population at risk.

  • per (int) – Rate denominator.

Returns:

period, cases, attack_rate.

Return type:

pandas DataFrame with columns

class episia.data.surveillance.AlertEngine(dataset)[source]#

Bases: object

Threshold-based and statistical alert detection for surveillance data.

Example:

engine = AlertEngine(dataset)
alerts = engine.run(
    threshold=10,
    zscore_threshold=2.0,
    use_endemic_channel=True,
)
for a in alerts:
    print(a.period, a.severity, a.message)
Parameters:

dataset (SurveillanceDataset)

__init__(dataset)[source]#
Parameters:

dataset (SurveillanceDataset)

alert_summary(alerts)[source]#

Summarise a list of alerts.

Parameters:

alerts (List[Alert])

Return type:

Dict[str, Any]

run(threshold=None, zscore_threshold=2.0, use_endemic_channel=False, historical_years=None, freq='W')[source]#

Run all enabled alert detectors.

Parameters:
  • threshold (float | None) – Absolute case count threshold.

  • zscore_threshold (float) – Z-score threshold for statistical alert.

  • use_endemic_channel (bool) – Use endemic channel (requires ≥3 historical years).

  • historical_years (List[int] | None) – Years to use for endemic channel baseline.

  • freq (str) – Aggregation frequency (‘D’, ‘W’, ‘ME’).

Returns:

List of Alert objects, sorted by period.

Return type:

List[Alert]

class episia.data.surveillance.Alert(period, value, threshold, kind, severity, district=None, disease=None, message='')[source]#

Bases: object

A single surveillance alert.

Parameters:
__init__(period, value, threshold, kind, severity, district=None, disease=None, message='')#
Parameters:
Return type:

None

disease: str | None = None#
district: str | None = None#
kind: str#
message: str = ''#
period: Any#
severity: str#
threshold: float#
value: float#

Functions#

episia.data.surveillance.from_dhis2_csv(path, date_col='periodName', cases_col='value', district_col='orgUnitName', **kwargs)[source]#

Load a DHIS2 standard CSV export.

DHIS2 exports typically have columns:

periodName, orgUnitName, dataElementName, value, …

Parameters:
  • path (str | Path) – Path to DHIS2 CSV export.

  • date_col (str) – Column with period label.

  • cases_col (str) – Column with case count value.

  • district_col (str) – Column with organisation unit name.

  • **kwargs – Passed to pd.read_csv.

Returns:

SurveillanceDataset.

Return type:

SurveillanceDataset

episia.data.surveillance.compute_attack_rate(cases, population, per=100000)[source]#

Compute attack rate.

Parameters:
  • cases (int) – Number of cases.

  • population (int) – Population at risk.

  • per (int) – Rate denominator (default 100,000).

Returns:

Attack rate per per population.

Return type:

float

episia.data.surveillance.endemic_channel(dataset, historical_years=None, percentiles=(25, 50, 75))[source]#

Module-level alias for dataset.endemic_channel().

Parameters:
Return type:

Dict[str, Any]

episia.data.surveillance.aggregate_by(dataset, freq='W', group_by=None)[source]#

Module-level alias for dataset.aggregate().

Parameters:

Examples#

Creating a surveillance dataset:

from episia.data.surveillance import SurveillanceDataset

# From CSV
ds = SurveillanceDataset.from_csv(
    "meningite_2024.csv",
    date_col="semaine",
    cases_col="cas",
    district_col="district",
    disease_col="maladie"
)

# Basic information
print(f"Total cases: {ds.total_cases}")
print(f"Date range: {ds.date_range}")
print(f"Districts: {ds.districts}")

Data aggregation:

# Aggregate weekly
weekly = ds.aggregate(freq="W")

# Aggregate by district and week
stratified = ds.aggregate(freq="W", group_by=["district"])

Epidemiological metrics:

# Attack rate
ar = ds.attack_rate(population=1000000, per=100000)

# Weekly attack rates
weekly_ar = ds.weekly_attack_rates(population=1000000)

# Endemic channel
channel = ds.endemic_channel(historical_years=[2020, 2021, 2022])
print(f"P75 threshold: {channel['p_high']}")

Alert detection:

# Create alert engine
engine = AlertEngine(ds)

# Run alerts
alerts = engine.run(
    threshold=50,
    zscore_threshold=2.0,
    use_endemic_channel=True
)

for alert in alerts:
    print(f"{alert.period}: {alert.severity} - {alert.message}")

# Alert summary
summary = engine.alert_summary(alerts)
print(f"Alerts by severity: {summary['severity_counts']}")

DHIS2 integration:

from episia.data.surveillance import from_dhis2_csv

# Load DHIS2 export
ds = from_dhis2_csv("dhis2_export.csv")