Skip to content

Architecture

System overview

The system has four independently deployable components that share only the object store (S3 or MinIO):

  • Pipeline — scheduled batch job that downloads EirGrid data and writes Parquet + JSON summaries.
  • API service — long-running FastAPI server (as a Lambda via Mangum in production) that serves the dashboard, the REST endpoints, and this documentation site at /guide/.
  • Gapcheck — daily Lambda that detects missing data across the last 31 days and schedules targeted backfills by async-invoking the pipeline Lambda. See Gap Detection.
  • Healthcheck — daily Lambda that probes every public /api/{area} endpoint and emails an alert on failure or staleness.
graph TD
    subgraph EirGrid["EirGrid Smart Grid Dashboard"]
        VARGEN["vargen API<br/>(wind, solar)"]
        INTERCONN["interconn API<br/>(interconnectors)"]
        DASH["DashboardService API<br/>(demand, CO₂, frequency,<br/>SNSP, fuelMix)"]
    end

    subgraph Pipeline["Pipeline (Lambda / Docker)"]
        SRC["EirGridSource<br/>async fetch()"]
        ORCH["Orchestrator<br/>run_pipeline()"]
        STORE["S3Storage<br/>upload_chunk()"]
        EXPORT["run_export()<br/>DuckDB → JSON"]
    end

    subgraph ObjectStore["Object Store (S3 / MinIO)"]
        PARQUET["Parquet files<br/>Hive-partitioned"]
        JSON["JSON summaries<br/>latest.json per area"]
    end

    subgraph API["API Service (FastAPI / Lambda+Mangum)"]
        CACHE["In-memory cache<br/>5-min TTL"]
        ENDPOINTS["REST endpoints<br/>/api/current<br/>/api/{area}<br/>/api/gaps"]
        UI["Built-in dashboard<br/>/ui/"]
        GUIDE["MkDocs site<br/>/guide/"]
    end

    VARGEN -->|HTTP| SRC
    INTERCONN -->|HTTP| SRC
    DASH -->|HTTP| SRC
    SRC --> ORCH
    ORCH --> STORE
    STORE -->|Snappy Parquet| PARQUET
    PARQUET -->|DuckDB glob| EXPORT
    EXPORT -->|JSON| JSON
    JSON -->|boto3| CACHE
    CACHE --> ENDPOINTS
    CACHE --> UI
    CACHE --> GUIDE

Gapcheck service

A third Lambda (eirgrid-downloader-gapcheck) runs once a day via EventBridge Scheduler. It walks the last 31 days of Parquet with DuckDB, identifies every (area, region) series that has two or more consecutive missing points, and asynchronously invokes the pipeline Lambda (one event per unique (area, region, day)). Attempts are recorded in an S3-hosted JSON ledger so the next run does not re-fire the same backfill before its cooldown expires. A cross-cutting gaps.json summary is written alongside the area summaries and served by the API at /api/gaps. See Gap Detection and ADR-006.

graph LR
    SCH["EventBridge Scheduler"] --> GC["gapcheck Lambda"]
    GC -->|DuckDB scan| S3P["S3 Parquet"]
    GC -->|async invoke<br/>per day| PL["pipeline Lambda"]
    GC -->|read / update| LED["S3 ledger.json"]
    GC -->|write| GSUM["S3 gaps.json"]
    PL -->|write merged| S3P

Pipeline internals

Each pipeline run processes sources sequentially. Within a source, all HTTP requests are issued concurrently (bounded by a semaphore). Chunks are uploaded to S3 as they arrive — the pipeline does not buffer all data in memory.

sequenceDiagram
    participant R as local_runner / Lambda
    participant O as Orchestrator
    participant S as EirGridSource
    participant H as httpx (async)
    participant ST as S3Storage
    participant OBJ as Object Store

    R->>O: run_pipeline(sources, config)
    loop for each source class
        O->>S: source.fetch()
        note over S: builds task list<br/>(month × area × region)
        S->>H: asyncio.gather(*tasks)
        H-->>S: DataChunks (or exceptions)
        loop for each DataChunk
            S-->>O: yield chunk
            O->>ST: upload_chunk(chunk, config)
            ST->>OBJ: GET existing Parquet
            OBJ-->>ST: existing rows (or 404)
            ST->>ST: concat + deduplicate
            ST->>OBJ: PUT merged Parquet
        end
    end
    O-->>R: PipelineResult
    R->>R: run_export(config)

Merge-on-write storage

A daily pipeline run with days_back=1 covers a date range that overlaps the current month's Parquet partition. Without merge-on-write, each run would overwrite the monthly file with only 1–2 days of data, destroying accumulated history.

S3Storage.upload_chunk always reads the existing file, concatenates new rows, deduplicates on (timestamp, Region, Field_Name) keeping the latest value, then writes the merged result back. This ensures a month partition always contains the full set of rows seen so far.

flowchart LR
    A["upload_chunk(chunk)"] --> B{Key exists\nin S3?}
    B -->|No| E["Write chunk.df\nas new Parquet"]
    B -->|Yes| C["Read existing\nParquet"]
    C --> D["concat existing + chunk.df\ndrop_duplicates keep=last"]
    D --> E
    E --> F["PUT to S3"]

API service

The API service loads all JSON summaries from S3 into an in-memory dict at startup, then refreshes every CACHE_TTL_SECONDS (default 5 min) using a daemon thread timer. All read endpoints are lock-protected but never block on S3.

graph LR
    subgraph Startup
        L["lifespan()"] --> SR["_schedule_refresh()"]
    end
    subgraph Background
        SR --> RC["_refresh_cache()"]
        RC -->|boto3 GET| S3["S3 / MinIO"]
        S3 --> RC
        RC --> CACHE[("_cache dict\n+ _cached_at")]
        RC --> T["Timer(TTL)\n→ _schedule_refresh()"]
    end
    subgraph Requests
        REQ["HTTP request"] --> LOCK["acquire _cache_lock\n(read snapshot)"]
        LOCK --> CACHE
        LOCK --> RESP["JSON response"]
    end

Healthcheck service

A third Lambda (eirgrid-downloader-healthcheck) runs once a day at 08:00 UTC via EventBridge Scheduler and probes every public /api/{area} endpoint. It is a stdlib + boto3 zip Lambda — separate from the pipeline/api container image — so a build break in the main image cannot disable the probe.

For each area the handler fetches the JSON summary, walks series for the newest point with a non-null value/actual/forecast, and flags the area as STALE if that point is more than STALENESS_HOURS (default 24) behind the probe time, or DOWN if the request errors. When any area fails, a single summary message is published to the eirgrid-downloader-healthcheck-alerts SNS topic, which fans out to the configured email subscription.

graph LR
    SCH["EventBridge Scheduler\ncron(0 8 * * ? *)"] --> HC["healthcheck Lambda"]
    HC -->|"GET /api/{area} × 7"| API["grid.lennonsec.org"]
    HC -->|publish on failure| SNS["SNS topic\nhealthcheck-alerts"]
    SNS --> MAIL["[email protected]"]

The handler accepts {"base_url": "...", "staleness_hours": N, "timeout": N} in the invoke event so the alert path can be exercised on demand without changing the schedule. See ADR-005.

Source interface

All data sources implement BaseSource. Adding a new source means creating a subclass and registering it — no changes to the orchestrator or storage layer.

classDiagram
    class BaseSource {
        <<abstract>>
        +source_id : ClassVar[str]
        +fetch() AsyncIterator[DataChunk]*
    }
    class EirGridSource {
        +source_id = "eirgrid"
        +fetch() AsyncIterator[DataChunk]
        -_fetch_vargen()
        -_fetch_interconn()
        -_fetch_dashboard()
        -_get_json()
        -_month_ranges()$
        -_week_ranges()$
        -_key()$
        -_key_daily()$
        -_end_timestamp()$
        -_is_retryable()$
    }
    class DataChunk {
        +source_id : str
        +s3_key_suffix : str
        +df : DataFrame
        +metadata : dict
    }
    class SourceConfig {
        +s3_bucket : str
        +s3_prefix : str
        +max_concurrency : int
        +request_timeout : int
        +max_retries : int
        +days_back : int
    }

    BaseSource <|-- EirGridSource
    EirGridSource ..> DataChunk : yields
    EirGridSource --> SourceConfig : reads

Retry strategy

HTTP retries use exponential back-off (backoff library). Only transient errors are retried — 4xx responses give up immediately to avoid hammering endpoints that don't support a given area/region combination.

flowchart TD
    REQ["HTTP GET"] --> OK{2xx?}
    OK -->|Yes| RETURN["return JSON"]
    OK -->|No| ERR{Error type}
    ERR -->|"5xx or\nTransportError"| RETRY{max_retries\nexceeded?}
    RETRY -->|No| BACKOFF["exponential\nback-off"] --> REQ
    RETRY -->|Yes| NONE["return None\n(logged)"]
    ERR -->|"4xx"| NONE