rmoff's random ramblings
about talks

Building a data pipeline with DuckDB

Published Mar 20, 2025 by in DuckDB, ETL, Data Engineering at https://preview.rmoff.net/2025/03/20/building-a-data-pipeline-with-duckdb/

In this blog post I’m going to explore how as a data engineer in the field today I might go about putting together a rudimentary data pipeline. I’ll take some operational data, and wrangle it into a form that makes it easily pliable for analytics work.

After a somewhat fevered and nightmarish period during which people walked around declaring "Schema on Read" was the future, that "Data is the new oil", and "Look at the size of my big data", the path that is history in IT is somewhat coming back on itself to a more sensible approach to things.

As they say:

What’s old is new

This is good news for me, because I am old and what I knew then is 'new' now ;)

Overview πŸ”—

The Data πŸ”—

this uses Environment Agency flood and river level data from the real-time data API (Beta)

If you’ve been following this blog closely you’ll have seen some of my noodlings around with this data already. I wrote about exploring it with DuckDB and Rill, using it as an excuse to try out the new DuckDB UI, as well as loading it into Kafka and figuring out what working with it in Flink SQL would look like.

At the heart of the data are readings, providing information about measures such as rainfall and river levels. These are reported from a variety of stations around the UK.

data model.excalidraw

The data is available on a public REST API (try it out here to see the current river level at one of the stations in Sheffield).

The Plan πŸ”—

overview.excalidraw

I’m going to make heavy use of DuckDB for this project. It can read data from REST APIs, it can process data, and it can store data. What’s more, it can be queried with various visualisation tools including Rill Data, Superset, and Metabase, as we’ll see later.

We’ll pull the data in using DuckDB’s read_json and httpfs core extension.

Once loaded, the dimension data will be brute-force rebuilt based on the latest values. For those who like this kind of thing (and who doesn’t), this is in effect Slowly-Changing Dimension (SCD) type 1. With SCD Type 1, no history is retained. This means that if a measure or station is removed, associated readings previously recorded will not have a match on the corresponding dimension table. If it’s updated, historical readings will be shown with the dimension data as-is now, not as-was then.

The readings fact data we’ll collect in a fact table that will mostly be appended to with each incremental load. It’s not entirely that simple though:

  • Some stations may lag in reporting their data, so we might pull duplicates (i.e. the reading for the last time period when it did report)

  • Some stations may batch their reporting, so we need to handle polling back over a period of time and dealing with the resulting duplicates for readings that had been reported

In addition, historical data is available and we want to be able to include that too.

Once we’ve got our fact and dimension data sorted, we’ll join it into a denormalised table that we can build analytics against.

The Environment πŸ”—

I’m running this all locally on my Mac. The first step is to install DuckDB and a few other useful tools:

brew install duckdb httpie jq

Then fire up DuckDB with its notebook-like UI (you don’t have to use the UI; you can use whatever interface you want):

duckdb env-agency.duckdb -ui

Extract (with just a little bit of transform) πŸ”—

The basic ingest looks like this:

CREATE OR REPLACE TABLE readings_stg AS
    SELECT * FROM read_json('https://environment.data.gov.uk/flood-monitoring/data/readings?latest')

DuckDB automagically determines the schema of the table. We’re going to do one bit of processing at this stage too.

By default all the API calls return a payload made up of metadata and then an array of the actual data. I decided to explode out the array at this point of ingest just to make things a bit easier.

At this point I’m throwing away the @context and meta data elements; you may decide you want to keep them.
CREATE OR REPLACE TABLE readings_stg AS
  WITH src AS (SELECT *
                FROM read_json('https://environment.data.gov.uk/flood-monitoring/data/readings?latest'))
    SELECT u.* FROM (
        SELECT UNNEST(items) AS u FROM src);

CREATE OR REPLACE TABLE measures_stg AS
  WITH src AS (SELECT *
                FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/measures'))
    SELECT u.* FROM (
        SELECT UNNEST(items) AS u FROM src);

CREATE OR REPLACE TABLE stations_stg AS
  WITH src AS (SELECT *
                FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/stations'))
    SELECT u.* FROM (
        SELECT UNNEST(items) AS u FROM src);

Let’s see what data we’ve pulled in:

SELECT 'readings_stg' AS table_name, COUNT(*) as row_ct, min(dateTime) as min_dateTime, max(dateTime) as max_dateTime FROM readings_stg
UNION ALL
SELECT 'measures_stg' AS table_name, COUNT(*) as row_ct,NULL,NULL from measures_stg
UNION ALL
SELECT 'stations_stg' AS table_name, COUNT(*) ,NULL,NULL from stations_stg;
table_name	    row_ct	min_dateTime	    max_dateTime
readings_stg	5272	2025-02-21 13:45:00	2025-03-20 13:25:10
measures_stg	6638
stations_stg	5400

The latest dateTime value looks right (it’s 2025-03-20 13:42:45 as I write this) but why is the minimum nearly a month ago? This is where the DuckDB UI’s data viz comes in useful:

dateTime viz

What this shows us is that almost all the data is for the latest timestamp, with just a handful of readings for other dates.

We can prove this out with a SQL query too:

SELECT dateTime, COUNT(*) AS reading_count
    FROM readings_stg
    GROUP BY dateTime
    ORDER BY 2 desc, 1 desc;

dateTime dist

Transform πŸ”—

Keys πŸ”—

The staging tables have no keys defined, because YOLO right? Well no. Staging is where we bring in the source data, warts and all. A station shouldn’t have more than one instance, but who says that’s the case?

Rather than failing the ingest because of a logical data error, it’s our job to work with what we’ve got. That means coding defensively and ensuring that whilst we’ll accept anything into the staging area, we don’t blindly propagate crap through the rest of the pipeline.

One of the ways to enforce this is constraints, of which primary keys are an example.

relationships.excalidraw

Readingsβ†’Measures πŸ”—

Unchanged, the data in readings relates to measures on the readings.measure column:

http://environment.data.gov.uk/flood-monitoring/id/measures/5312TH-level-stage-i-15_min-mASD

On measures the @id column matches this:

http://environment.data.gov.uk/flood-monitoring/id/measures/5312TH-level-stage-i-15_min-mASD

But this is duplicated in the notation column, minus the http://environment.data.gov.uk/flood-monitoring/id/measures/ URL prefix:

5312TH-level-stage-i-15_min-mASD

We’ll pre-process the readings.measure column to strip this prefix to make the join easier (and simpler to debug, since you’re not wading through columns of long text).

Measuresβ†’Stations πŸ”—

The station for which a reading was taken is found via the measure, since measures are unique to a station.

On measures the station column is the foreign key:

http://environment.data.gov.uk/flood-monitoring/id/stations/SP50_72

Again, the URL prefix (http://environment.data.gov.uk/flood-monitoring/id/stations/) is repeated and we’ll strip that out.

One thing that caught me out here is that the station (minus the URL prefix) and the stationReference are almost always the same.

Almost always.

I spent a bunch of time chasing down duplicates after the subsequent join to the fact table resulted in a fan-out, because stationReference isn’t unique.

SELECT stationReference, station
    FROM measures
    WHERE station!=stationReference
    ORDER BY stationReference;
stationReference	station
4063TH	            4063TH-southern
4063TH	            4063TH-thames
E22300	            E22300-anglian
E22300	            E22300-southern
E22300	            E22300-southern
[…]

26 rows out of 6612…enough to cause plenty of trouble when I made assumptions about the data I was eyeballing and missed the 0.4% exceptions…

It does state it clearly in the API doc; station is the foreign key, not stationReference. RTFM, always ;)

Dimension tables πŸ”—

Building the dimension tables is simple, if crude, enough. With a CREATE OR REPLACE we tell DuckDB to go ahead and create the table, and if it exists already, nuke it and create a fresh version.

The transformation we’ll do is pretty light.

Measures πŸ”—

We’re going to drop a couple of fields:

  • @id we don’t need

  • latestReading holds fact data that we’re getting from elsewhere, so no point duplicating it here

We’ll also transform the foreign key to strip the URL prefix making it easier to work with.

CREATE OR REPLACE TABLE measures AS
    SELECT *
            EXCLUDE ("@id", latestReading)
            REPLACE(
                REGEXP_REPLACE(station,
                        'http://environment\.data\.gov\.uk/flood-monitoring/id/stations/',
                        '') AS station
            )
    FROM measures_stg;

This is using a couple of my favourite recent discoveries in DuckDBβ€”the EXCLUDE and REPLACE clauses.

With EXCLUDE we’re taking advantage of SELECT * to bring in all columns from the source tableβ€”which saves typing, but also means new columns added to the source will propagate automagicallyβ€”but with the exception of ones that we don’t want.

The REPLACE clause is a really elegant way of providing a different version of the same column. Since we want to retain the station column but just trim the prefix, this is a great way to do it without moving its position in the column list. The other option would have been to EXCLUDE it too, and then add it on to the column list.

With the table created let’s define the primary key as discussion above:

ALTER TABLE measures
    ADD CONSTRAINT measures_pk PRIMARY KEY (notation);

Stations πŸ”—

Very similar to the process above:

CREATE OR REPLACE TABLE stations AS
    SELECT * EXCLUDE (measures)
    FROM stations_stg;

ALTER TABLE stations
    ADD CONSTRAINT stations_pk PRIMARY KEY (notation);

One thing that you might also do is move the primary key (notation) to be the first column in the table. This is a habit I picked up years ago; I don’t know if it’s still common practice. To do it you’d EXCLUDE the field and manually prefix it to the star expression:

CREATE OR REPLACE TABLE stations AS
    SELECT notation, * EXCLUDE (measures, notation)
    FROM stations_stg;

ALTER TABLE stations ADD CONSTRAINT stations_pk PRIMARY KEY (notation);

(If you do this, you’d want to logically do the same for the other tables' PKs too).

Fact table πŸ”—

This is where things get fun :)

Because we’re going to be adding to the table with new data rather than replacing it, we can’t just CREATE OR REPLACE it each time. Therefore we’ll run the CREATE as a one-off:

CREATE TABLE IF NOT EXISTS readings AS
    SELECT * EXCLUDE "@id" FROM readings_stg WHERE FALSE;

A few notes:

  • IF NOT EXISTS makes sure we don’t overwrite the table. We’d get the same effect if we just put CREATE TABLE, the only difference is the latter would fail if the table already exists, whilst IF NOT EXISTS causes it to exit silently.

  • We’re going to EXCLUDE the @id column because we don’t need it

  • This will only create the table using the schema projected by the SELECT; the WHERE FALSE means no rows will be selected. This is so that we decouple the table creation from its population.

Now we’ll add a primary key. The key here is the time of the reading (dateTime) plus the measure (measure).

ALTER TABLE readings
    ADD CONSTRAINT readings_pk PRIMARY KEY (dateTime, measure);

Populating the fact table πŸ”—

Our logic here is: "Add data if it’s new, don’t throw an error if it already exists". Our primary key for this is the time of the reading and the measure. If we receive a duplicate we’re going to ignore it.

We’re making a design choice here; in theory we could decide that a duplicate reading represents an update to the original (re-stating a fact that could have been wrong previously) and handle it as an UPSERT (i.e. INSERT if new, UPDATE if existing).

DuckDB has some very nice syntax available around the INSERT INTO … SELECT FROM pattern. To achieve what we want we use the self-documenting statement INSERT OR IGNORE. This is a condensed version of the more verbose INSERT INTO… SELECT FROM… ON CONFLICT DO NOTHING syntax.

INSERT OR IGNORE INTO readings
SELECT *
        EXCLUDE "@id"
        REPLACE(
            REGEXP_REPLACE(measure,
            'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
            '') AS measure)
FROM readings_stg

We’re using the same EXCLUDE and REPLACE expressions as we did above; remove the @id column, and strip the URL prefix from the foreign key measure.

The first time we run this we can see the number of INSERTS:

changes: 5272

Then we re-run it:

changes:   0

Since nothing changed in the staging table, this makes sense. Let’s load the staging table with the latest data again:

changes: 4031

Joining the data πŸ”—

Similar to the fact table above, we’re going to be incrementally loading this final, denormalised, table. I’m taking a slightly roundabout tack to do this here.

First, I’ve defined a view which is the result of the join:

CREATE OR REPLACE VIEW vw_readings_enriched AS
    SELECT  "r_\0": COLUMNS(r.*),
            "m_\0": COLUMNS(m.*),
            "s_\0": COLUMNS(s.*)
            FROM
            readings r
            LEFT JOIN measures m ON r.measure = m.notation
            LEFT JOIN stations s ON m.station = s.notation
See my earlier blog post if you’re not familiar with the COLUMNS syntax

From the view I create the table’s schema (but don’t populate anything yet):

CREATE TABLE IF NOT EXISTS readings_enriched AS
    SELECT * FROM vw_readings_enriched LIMIT 0;

ALTER TABLE readings_enriched
    ADD CONSTRAINT readings_enriched_pk PRIMARY KEY (r_dateTime, r_measure);

And now populate it in the same way as we did for the readings table:

INSERT OR IGNORE INTO readings_enriched
    SELECT * FROM vw_readings_enriched;

Query the joined data πŸ”—

Now that we’ve got our joined data we can start to query and analyse it. Here’s the five most recent readings for all water level measurements on the River Wharfe:

SELECT r_dateTime
		, s_label
		, r_value
FROM readings_enriched
WHERE s_rivername= 'River Wharfe' and m_parameterName = 'Water Level'
ORDER BY r_dateTime desc LIMIT 5 ;
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚     r_dateTime      β”‚               s_label                β”‚ r_value β”‚
β”‚      timestamp      β”‚                 json                 β”‚ double  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 2025-03-19 15:00:00 β”‚ "Kettlewell"                         β”‚   0.171 β”‚
β”‚ 2025-03-19 15:00:00 β”‚ "Cock Beck Sluices"                  β”‚   3.598 β”‚
β”‚ 2025-03-19 15:00:00 β”‚ "Nun Appleton Fleet Pumping Station" β”‚   2.379 β”‚
β”‚ 2025-03-19 15:00:00 β”‚ "Tadcaster"                          β”‚   0.227 β”‚
β”‚ 2025-03-19 15:00:00 β”‚ "Netherside Hall"                    β”‚   0.319 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Historical data πŸ”—

The readings API includes the option for specifying a date range. However, there is a hard limit of 10000 rows, and a single time period’s readings for all stations is about 5000 rows, this doesn’t look like a viable option if we’re wanting to backfill data for all stations.

Historic readings are available, although in CSV format rather than the JSON we’re used to. Nothing like real-world data engineering problems to keep us on our feet :)

$ http https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-18.csv |head -n2
dateTime,measure,value
2025-03-18T00:00:00Z,http://environment.data.gov.uk/flood-monitoring/id/measures/531166-level-downstage-i-15_min-mAOD,49.362

Fortunately, DuckDB has us covered, and handles it in its stride:

πŸŸ‘β—— SELECT * FROM 'https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-18.csv' LIMIT 1;
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚      dateTime       β”‚                                             measure                                              β”‚  value  β”‚
β”‚      timestamp      β”‚                                             varchar                                              β”‚ varchar β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 2025-03-18 00:00:00 β”‚ http://environment.data.gov.uk/flood-monitoring/id/measures/531166-level-downstage-i-15_min-mAOD β”‚ 49.362  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

…or almost in its strideβ€”once I ran it on a full file I got this:

CSV Error on Line: 388909
Original Line:
2025-03-17T22:30:00Z,http://environment.data.gov.uk/flood-monitoring/id/measures/690552-level-stage-i-15_min-m,0.770|0.688
Error when converting column "value". Could not convert string "0.770|0.688" to 'DOUBLE'

Column value is being converted as type DOUBLE
This type was auto-detected from the CSV file.
[…]

Bravo for such a verbose and useful error message. Not just "there’s an error", or "could not convert", but tells you where, shows you the line, makes it super-easy to understand the problem and what to do.

What to do? Brush it under the carpet and pretend it didn’t happen! In other words, ignore_errors=true:

CREATE OR REPLACE TABLE readings_historical AS
  SELECT *
    FROM read_csv('https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-18.csv',
                    ignore_errors=true)

This loads all 476k rows of data for 18th March into a new table. Now we’ll add the previous days tooβ€”and head out to the shell to do it:

❯ duckdb env-agency.duckdb -c "INSERT INTO readings_historical SELECT * FROM read_csv('https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-16.csv', ignore_errors=true);"
100% β–•β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–
Run Time (s): real 16.405 user 1.090767 sys 0.516826

Even more concise is the COPY option:

duckdb env-agency.duckdb -c "COPY readings_historical FROM 'https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-14.csv' (IGNORE_ERRORS);"
Run Time (s): real 3.275 user 1.718801 sys 0.247875

Why am I doing this from the shell? So that I can then do this:

start_date="2025-01-01"
end_date="2025-03-13"

current_date=$start_date
while [[ "$current_date" < "$end_date" || "$current_date" == "$end_date" ]]; do
    echo "Processing $current_date..."
    duckdb env-agency.duckdb -c \
        "COPY readings_historical
        FROM 'https://environment.data.gov.uk/flood-monitoring/archive/readings-$current_date.csv'
        (IGNORE_ERRORS);"
    current_date=$(date -d "$current_date + 1 day" +%Y-%m-%d)
done

In the readings_historical table is now a nice big chunk of data (not Big Data, just a big chunk of normally-size data):

historical

Now to merge this into the main table:

πŸŸ‘β—— INSERT OR IGNORE INTO readings
    SELECT *
            REPLACE(
                REGEXP_REPLACE(measure,
                'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
                '') AS measure)
    FROM readings_historical;
Run Time (s): real 0.003 user 0.002708 sys 0.000571
Conversion Error:
Could not convert string '0.772|0.692' to DOUBLE

Here’s the problem with taking the easy route. By letting DuckDB guess at the data types for the CSV data, we’ve ended up with dodgy data being ingested. How much dodgy data? 0.01%…

πŸŸ‘β—— SELECT COUNT(*) FROM readings_historical WHERE TRY_CAST(value AS DOUBLE) IS NULL ;
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ count_star() β”‚
β”‚    int64     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚     3202     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

It took a few minutes to load the historical data, so instead of ditching the table let’s just deal with what we’ve got. First up, what is the dodgy data?

πŸŸ‘β—— SELECT value
    FROM readings_historical
    WHERE TRY_CAST(value AS DOUBLE) IS NULL
    USING SAMPLE 0.5%;
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    value    β”‚
β”‚   varchar   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 2.415|2.473 β”‚
β”‚ 1.496|1.489 β”‚
β”‚ 1.730|1.732 β”‚
β”‚ 1.419|1.413 β”‚
β”‚ 1.587|1.586 β”‚
β”‚ 1.097|1.101 β”‚
β”‚ 1.032|1.033 β”‚
β”‚ 0.866|0.874 β”‚
β”‚ 0.864|0.862 β”‚
β”‚ 0.861|0.862 β”‚
β”‚ 0.386|0.387 β”‚
β”‚ 1.118|1.062 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚   12 rows   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

It looks like they all follow this pattern of two valid-looking values separated by a pipe |. We can double-check this:

πŸŸ‘β—— SELECT * FROM readings_historical
    WHERE value NOT LIKE '%|%'
    AND TRY_CAST(value AS DOUBLE) IS NULL;
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ dateTime  β”‚ measure β”‚  value  β”‚
β”‚ timestamp β”‚ varchar β”‚ varchar β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚            0 rows             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

We’ll make an executive decision to take the first value in these pairs, using REPLACE to override the value to split out the string and use the first instance.

INSERT OR IGNORE INTO readings
    SELECT *
            REPLACE(
                REGEXP_REPLACE(measure,
                'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
                '') AS measure,
                SPLIT_PART(value, '|', 1) AS value)
    FROM readings_historical
    WHERE value LIKE '%|%';

Now we can load the rest of the data:

πŸŸ‘β—— INSERT OR IGNORE INTO readings
        SELECT *
                REPLACE(
                    REGEXP_REPLACE(measure,
                    'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
                    '') AS measure)
        FROM readings_historical
        WHERE value NOT LIKE '%|%';
100% β–•β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–
Run Time (s): real 218.189 user 213.713807 sys 3.481680
changes: 37031770   total_changes: 37034972

The data’s loaded:

πŸŸ‘β—— SELECT COUNT(*) as row_ct,
            min(dateTime) as min_dateTime,
            max(dateTime) as max_dateTime
    FROM readings;
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  row_ct  β”‚    min_dateTime     β”‚    max_dateTime     β”‚
β”‚  int64   β”‚      timestamp      β”‚      timestamp      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 37044275 β”‚ 2025-01-01 00:00:00 β”‚ 2025-03-20 15:25:10 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Now to load it into the denormalised tableβ€”for this it’s the same query as when we’re just loading incremental changes:

INSERT OR IGNORE INTO readings_enriched
    SELECT * FROM vw_readings_enriched;

Let’s "Operationalise" it πŸ”—

Let’s have a look at a very rough way of running the update for this pipeline automatically.

We’ll create a SQL script to update the dimension data:

dimensions.sql
-- Load the staging data from the REST API
CREATE OR REPLACE TABLE measures_stg AS
  WITH src AS (SELECT *
                FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/measures'))
    SELECT u.* FROM (
        SELECT UNNEST(items) AS u FROM src);

CREATE OR REPLACE TABLE stations_stg AS
  WITH src AS (SELECT *
                FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/stations'))
    SELECT u.* FROM (
        SELECT UNNEST(items) AS u FROM src);

-- Rebuild dimension tables
CREATE OR REPLACE TABLE measures AS
    SELECT *
            EXCLUDE ("@id", latestReading)
            REPLACE(
                REGEXP_REPLACE(station,
                        'http://environment\.data\.gov\.uk/flood-monitoring/id/stations/',
                        '') AS station
            )
    FROM measures_stg;
ALTER TABLE measures
    ADD CONSTRAINT measures_pk PRIMARY KEY (notation);

CREATE OR REPLACE TABLE stations AS
    SELECT * EXCLUDE (measures)
    FROM stations_stg;

ALTER TABLE stations
    ADD CONSTRAINT stations_pk PRIMARY KEY (notation);

and load the fact data:

fact.sql
-- Load the staging data from the REST API
CREATE OR REPLACE TABLE readings_stg AS
  WITH src AS (SELECT *
                FROM read_json('https://environment.data.gov.uk/flood-monitoring/data/readings?latest'))
    SELECT u.* FROM (
        SELECT UNNEST(items) AS u FROM src);

-- Merge into the fact table
INSERT OR IGNORE INTO readings
SELECT *
        EXCLUDE "@id"
        REPLACE(
            REGEXP_REPLACE(measure,
            'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
            '') AS measure)
FROM readings_stg

-- Merge into the denormalised table
INSERT OR IGNORE INTO readings_enriched
    SELECT * FROM vw_readings_enriched;

Now to schedule it. An entire industry has been built around workflow scheduling tools; I’m going to stick to the humble cron. It’s simple, it’s quick, and LLMs have now learnt how to write the syntax ;)

crontab

Well, the syntax to invoke DuckDB is a bit different from what Claude thought, but the fiddly */15 stuff it nailed.

Here’s the crontab I set up (crontab -e)

# Run the pipeline every 15 minutes
*/15 * * * * cd ~/env-agency/ && /opt/homebrew/bin/duckdb env-agency.duckdb -f dimensions.sql && /opt/homebrew/bin/duckdb env-agency.duckdb -f fact.sql

Every fifteen minutes this pulls down the latest data, rebuilds the dimension tables, and adds the new data to the fact table.

Analysing the data πŸ”—

Let’s finish off by looking at how we can analyse the data.

Metabase πŸ”—

Metabase is fairly quick to get up and running. The complication is that to query DuckDB you need a driver that Motherduck have created that I couldn’t get to work under Docker, hence running Metabase locally:

# You need Java 21
sdk install java 21.0.6-tem

# Download Metabase & Metaduck
mkdir metabase
curl https://downloads.metabase.com/v0.53.7/metabase.jar -O
mkdir plugins && pushd plugins && curl https://github.com/motherduckdb/metabase_duckdb_driver/releases/download/0.2.12-b/duckdb.metabase-driver.jar -O && popd

# Run metabase
java --add-opens java.base/java.nio=ALL-UNNAMED -jar metabase.jar

This launches Metabase on http://localhost:3000, and after an annoying on-boarding survey, it’s remarkably quick to get something created. Adding the database is simple enough:

metabase add db

Once you’ve done that Metabase automagically (I’m surprised it doesn’t say "AI" when it does it) offers a summary of the data in the table:

metabase wizard

It’s not a bad starter for ten; the count of rows is useful from a data-completeness point of view. We’d need to do some work to define the value metric and perhaps some geographic hierarchiesβ€”but there’s definitely lots of potential.

metabase db

You can also poke around in the data itself with a tabular slice & dice approach:

metbase table

Rill πŸ”—

I used Rill previously and liked it.

Getting it up and running is easy:

# Install…
curl https://rill.sh | sh

# …and go!
rill start rill-env-agency

Rill supports DuckDB out of the box, so adding our data source is easy:

rill add

Who can resist a bit of AI magic?

rill ai

As with Metabase, it’s a pretty good starting point for customising into what you want to analyse.

rill db

With a bit of playing around you can create a nice comparison between January and February’s readings for stations on a given river:

rill compare

I couldn’t figure out how to plot a time series of values for a series of data, but as my children would say to me, that’s probably a skill issue on my part…

Superset πŸ”—

This is a bit heavier to install than Metabase, and definitely more so than Rill. After an aborted attempt to install it locally I went the Docker route even though it meant a bit of fiddling to get the DuckDB dependency installed.

Follow the steps in the Quickstart to clone the repo, and then modify the command for the superset service to install the DuckDB dependencies before launching Superset itself:

    command: ["sh", "-c", "pip install duckdb duckdb-engine && /app/docker/docker-bootstrap.sh app-gunicorn"]

Now bring up Superset:

docker compose -f docker-compose-image-tag.yml up

You’ll find Superset at http://localhost:8088β€”note that it does take a few minutes to boot up, so don’t be impatient if it doesn’t seem to be working straight away.

After quite a lot of fiddling around to get this far, I realised that my DuckDB file is on my host machine, not in the Docker container. I couldn’t just mount it as a volume as there are already volumes mounted using a syntax I wasn’t familiar with how to add to:

    volumes:
      *superset-volumes

Instead I just did a bit of a hack and copied the file onto the container:

docker compose cp ~/env-agency/env-agency.duckdb superset:/tmp/

Finally, within Superset, I could add the database (Settings β†’ Manage Databases). Somewhat confusingly, if you select "DuckDB" as the type, you’re asked for "DuckDB Credentials" and a Motherduck access token; click the small Connect this database with a SQLAlchemy URI string instead link (or just select "Other" database type in the first place).

Specify the local path to your DuckDB file, for example:

duckdb:////tmp/env-agency.duckdb

ss db

Next, create a Datasetβ€”select the database you just defined, and the readings_enriched table:

ss ds

After all that, fortunately, Superset has a rich set of functionality particularly when it comes to charting. I did hit frequent time-outs when experimenting, but managed to create a nice time-series chart fairly easily:

ss viz1

Wrapping up πŸ”—

overview.excalidraw

We’ve built cobbled together a pipeline that extracts data from a set of REST APIs, applies some light cleansing and transformation, and loads it into a DuckDB table from where it can be queried and analysed. With cron we’ve automated the refresh of this data every fifteen minutes.

The total bill of materials is approximately:

  • 1 x DuckDB

  • 14 SQL statements (16 if you include historical backfill)

  • 1 ropey cron job

Would this pass muster in a real deployment? You tell me :)

My guess is that I’d not want to be on the hook to support it, but it’d do the job until it didn’t. That is, as a prototype with sound modelling to expand on later it’s probably good enough.

But I’d love to hear from folk who are building this stuff for real day in, day out. What did I overlook here? Is doing it in DuckDB daft? Let me know on Bluesky or LinkedIn.

You can find the full set of SQL files to run this here.

Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

Β© 2025