Architecture¶
System overview¶
The system has four independently deployable components that share only the object store (S3 or MinIO):
- Pipeline — scheduled batch job that downloads EirGrid data and writes Parquet + JSON summaries.
- API service — long-running FastAPI server (as a Lambda via Mangum in production) that serves the dashboard, the REST endpoints, and this documentation site at
/guide/. - Gapcheck — daily Lambda that detects missing data across the last 31 days and schedules targeted backfills by async-invoking the pipeline Lambda. See Gap Detection.
- Healthcheck — daily Lambda that probes every public
/api/{area}endpoint and emails an alert on failure or staleness.
graph TD
subgraph EirGrid["EirGrid Smart Grid Dashboard"]
VARGEN["vargen API<br/>(wind, solar)"]
INTERCONN["interconn API<br/>(interconnectors)"]
DASH["DashboardService API<br/>(demand, CO₂, frequency,<br/>SNSP, fuelMix)"]
end
subgraph Pipeline["Pipeline (Lambda / Docker)"]
SRC["EirGridSource<br/>async fetch()"]
ORCH["Orchestrator<br/>run_pipeline()"]
STORE["S3Storage<br/>upload_chunk()"]
EXPORT["run_export()<br/>DuckDB → JSON"]
end
subgraph ObjectStore["Object Store (S3 / MinIO)"]
PARQUET["Parquet files<br/>Hive-partitioned"]
JSON["JSON summaries<br/>latest.json per area"]
end
subgraph API["API Service (FastAPI / Lambda+Mangum)"]
CACHE["In-memory cache<br/>5-min TTL"]
ENDPOINTS["REST endpoints<br/>/api/current<br/>/api/{area}<br/>/api/gaps"]
UI["Built-in dashboard<br/>/ui/"]
GUIDE["MkDocs site<br/>/guide/"]
end
VARGEN -->|HTTP| SRC
INTERCONN -->|HTTP| SRC
DASH -->|HTTP| SRC
SRC --> ORCH
ORCH --> STORE
STORE -->|Snappy Parquet| PARQUET
PARQUET -->|DuckDB glob| EXPORT
EXPORT -->|JSON| JSON
JSON -->|boto3| CACHE
CACHE --> ENDPOINTS
CACHE --> UI
CACHE --> GUIDE
Gapcheck service¶
A third Lambda (eirgrid-downloader-gapcheck) runs once a day via EventBridge Scheduler. It walks the last 31 days of Parquet with DuckDB, identifies every (area, region) series that has two or more consecutive missing points, and asynchronously invokes the pipeline Lambda (one event per unique (area, region, day)). Attempts are recorded in an S3-hosted JSON ledger so the next run does not re-fire the same backfill before its cooldown expires. A cross-cutting gaps.json summary is written alongside the area summaries and served by the API at /api/gaps. See Gap Detection and ADR-006.
graph LR
SCH["EventBridge Scheduler"] --> GC["gapcheck Lambda"]
GC -->|DuckDB scan| S3P["S3 Parquet"]
GC -->|async invoke<br/>per day| PL["pipeline Lambda"]
GC -->|read / update| LED["S3 ledger.json"]
GC -->|write| GSUM["S3 gaps.json"]
PL -->|write merged| S3P
Pipeline internals¶
Each pipeline run processes sources sequentially. Within a source, all HTTP requests are issued concurrently (bounded by a semaphore). Chunks are uploaded to S3 as they arrive — the pipeline does not buffer all data in memory.
sequenceDiagram
participant R as local_runner / Lambda
participant O as Orchestrator
participant S as EirGridSource
participant H as httpx (async)
participant ST as S3Storage
participant OBJ as Object Store
R->>O: run_pipeline(sources, config)
loop for each source class
O->>S: source.fetch()
note over S: builds task list<br/>(month × area × region)
S->>H: asyncio.gather(*tasks)
H-->>S: DataChunks (or exceptions)
loop for each DataChunk
S-->>O: yield chunk
O->>ST: upload_chunk(chunk, config)
ST->>OBJ: GET existing Parquet
OBJ-->>ST: existing rows (or 404)
ST->>ST: concat + deduplicate
ST->>OBJ: PUT merged Parquet
end
end
O-->>R: PipelineResult
R->>R: run_export(config)
Merge-on-write storage¶
A daily pipeline run with days_back=1 covers a date range that overlaps the current month's Parquet partition. Without merge-on-write, each run would overwrite the monthly file with only 1–2 days of data, destroying accumulated history.
S3Storage.upload_chunk always reads the existing file, concatenates new rows, deduplicates on (timestamp, Region, Field_Name) keeping the latest value, then writes the merged result back. This ensures a month partition always contains the full set of rows seen so far.
flowchart LR
A["upload_chunk(chunk)"] --> B{Key exists\nin S3?}
B -->|No| E["Write chunk.df\nas new Parquet"]
B -->|Yes| C["Read existing\nParquet"]
C --> D["concat existing + chunk.df\ndrop_duplicates keep=last"]
D --> E
E --> F["PUT to S3"]
API service¶
The API service loads all JSON summaries from S3 into an in-memory dict at startup, then refreshes every CACHE_TTL_SECONDS (default 5 min) using a daemon thread timer. All read endpoints are lock-protected but never block on S3.
graph LR
subgraph Startup
L["lifespan()"] --> SR["_schedule_refresh()"]
end
subgraph Background
SR --> RC["_refresh_cache()"]
RC -->|boto3 GET| S3["S3 / MinIO"]
S3 --> RC
RC --> CACHE[("_cache dict\n+ _cached_at")]
RC --> T["Timer(TTL)\n→ _schedule_refresh()"]
end
subgraph Requests
REQ["HTTP request"] --> LOCK["acquire _cache_lock\n(read snapshot)"]
LOCK --> CACHE
LOCK --> RESP["JSON response"]
end
Healthcheck service¶
A third Lambda (eirgrid-downloader-healthcheck) runs once a day at 08:00 UTC via EventBridge Scheduler and probes every public /api/{area} endpoint. It is a stdlib + boto3 zip Lambda — separate from the pipeline/api container image — so a build break in the main image cannot disable the probe.
For each area the handler fetches the JSON summary, walks series for the newest point with a non-null value/actual/forecast, and flags the area as STALE if that point is more than STALENESS_HOURS (default 24) behind the probe time, or DOWN if the request errors. When any area fails, a single summary message is published to the eirgrid-downloader-healthcheck-alerts SNS topic, which fans out to the configured email subscription.
graph LR
SCH["EventBridge Scheduler\ncron(0 8 * * ? *)"] --> HC["healthcheck Lambda"]
HC -->|"GET /api/{area} × 7"| API["grid.lennonsec.org"]
HC -->|publish on failure| SNS["SNS topic\nhealthcheck-alerts"]
SNS --> MAIL["[email protected]"]
The handler accepts {"base_url": "...", "staleness_hours": N, "timeout": N} in the invoke event so the alert path can be exercised on demand without changing the schedule. See ADR-005.
Source interface¶
All data sources implement BaseSource. Adding a new source means creating a subclass and registering it — no changes to the orchestrator or storage layer.
classDiagram
class BaseSource {
<<abstract>>
+source_id : ClassVar[str]
+fetch() AsyncIterator[DataChunk]*
}
class EirGridSource {
+source_id = "eirgrid"
+fetch() AsyncIterator[DataChunk]
-_fetch_vargen()
-_fetch_interconn()
-_fetch_dashboard()
-_get_json()
-_month_ranges()$
-_week_ranges()$
-_key()$
-_key_daily()$
-_end_timestamp()$
-_is_retryable()$
}
class DataChunk {
+source_id : str
+s3_key_suffix : str
+df : DataFrame
+metadata : dict
}
class SourceConfig {
+s3_bucket : str
+s3_prefix : str
+max_concurrency : int
+request_timeout : int
+max_retries : int
+days_back : int
}
BaseSource <|-- EirGridSource
EirGridSource ..> DataChunk : yields
EirGridSource --> SourceConfig : reads
Retry strategy¶
HTTP retries use exponential back-off (backoff library). Only transient errors are retried — 4xx responses give up immediately to avoid hammering endpoints that don't support a given area/region combination.
flowchart TD
REQ["HTTP GET"] --> OK{2xx?}
OK -->|Yes| RETURN["return JSON"]
OK -->|No| ERR{Error type}
ERR -->|"5xx or\nTransportError"| RETRY{max_retries\nexceeded?}
RETRY -->|No| BACKOFF["exponential\nback-off"] --> REQ
RETRY -->|Yes| NONE["return None\n(logged)"]
ERR -->|"4xx"| NONE