data engineering
Contact

How to transform cycling training data with dbt

Feb 18, 2024

This is part 3 of a personal project called antren.

To recap where in the pipeline we are: my cycling training data is now in BigQuery, in a table found at antren_app.activities_raw, which looks something like this:

activity_id start_time data
13947598259 1707845190 "{'time': [1707845190, 1707845191, 1707845192...], 'watts': [425, 399, 384...],'heart_rate': [141,141,142...]}"

The idea behind storing the activity ticks in a JSON-like format was to make this step more challenging.

The outcome

The result of this step is an reporting-ready table rpt__daily_rolling_peaks, which tracks the peak average power by different durations. For example, what is my peak power over 1 minute, 5 minutes or 20 minutes? And what is the peak in the last 14, 28 or 84 days? It looks something like this:

date_day max_peak_1min/ _watts_r14d max_peak_1min/ _watts_r28d max_peak_1min/ _watts_r84d
2024-02-18T00:00:00 353.9 434.8 434.8

This is a subset of the columns available. Every row represents a day and there's a column for:

  • every duration I want to track (in seconds): [5, 15, 30, 60, 300, 600, 1200, 3600]
  • every lookback window I want to get a max for: [14, 28, 84] (1 + 8 * 3 = 25 columns in total)

The resulting table is ready to be used in a BI dashboard as I did in this publicly-accessible hex project. Let's get into the transformation steps.

Repo: alhankeser/antren-dbt

DBT setup and structure

I used dbt-core for this project and implemented the typical dbt structure to get started on this project (see repo). This is the overview of the models directory:

models/
|-- marts/
|	|-- _marts.yml
|   |-- rpt__daily_rolling_peaks.sql
|   |-- ...
|-- staging/
|   |-- _staging.yml
|   |-- stg__activities_points.sql
|   |-- ...
|-- sources/
	|-- _sources.yml
	|-- src__activities.sql
    |-- ...

My dev runs go to a dedicated schema dbt_alhan.

Sources

My sources in this project are simple: I have a single table with a row per activity. Each activity has a column for activity_id and start_time. A JSON-like column "data" has keys for time (unix timestamp per second of the activity), heart rate measured in beats per minute and watts as measured by a cycling power meter.

with
    base as (select * from {{ source("antren_app", "activities") }}),

    final as (
        select
            activity_id as id, 
            start_time as start_time_ts, 
            data as points
        from base
    )

select * from final

{% if target.name == 'dev' %}
    where timestamp_seconds(start_time_ts) > timestamp_sub(current_timestamp(), interval 14 day)
{% endif %}

To save time during development, I only grab the last 2 weeks of rides using the {% if target.name == 'dev' %} where statement.

Staging

In my staging layer, my goal was to solve the following problems:

  • Unnest activity data points into columns
  • Fill gaps in activity data (e.g. where the activity was paused) with zeros.
  • Create rolling average watts columns for my list of target durations.
  • Get max of each rolling average column per activity.

I purposefully hardcoded certain columns so that I can write about how to refactor this for improved flexibility downstream. Here is how I went about solving the problems above, step-by-step:

Unnesting data points from json-like columns

See in GitHub The outcome of this step is an incremental model that looks something like this:

id start_time_ts end_time_ts ts watts heart_rate
12498270976 1698483182 1698484099 1698483853 105 103
12498270976 1698483182 1698484099 1698483745 103 108
12498270976 1698483182 1698484099 1698483781 111 108

I start by unnesting the data point types, creating array columns for each: timestamp, watts and heart rate.

-- models/staging/stg__activities_points.sql

activities_arrays as (
    select
        id,
        start_time_ts,
        start_time_utc,
        json_extract_array(points, '$.time') as ts_array,
        json_extract_array(points, '$.watts') as watts_array,
        json_extract_array(points, '$.heart_rate') as heart_rate_array
    from activities
    {% if is_incremental() %}
        where start_time_utc > (select max(start_time_utc) from {{ this }})
    {% endif %}
),

To create a "timestamp spine" based on activity start and end, I first get the end timestamp for each activity:

-- models/staging/stg__activities_points.sql

activities_end_times as (
    select
        id,
        cast(max(ts) as int64) as end_time_ts
    from activities_arrays,
        unnest(ts_array) as ts
    group by id
),

Then I use the BigQuery generate_array function to create the fully filled-in array of timestamps from the start to the end of the activity. activities_filled_in_ts_unnested will serve as the "spine" for my activity data points. I subtract the max peak_time_range from the start time to get an array that starts prior to the actual start time, so that all rolling averages will correctly include the zeros prior to the activity start (yeah, there are better solutions):

-- models/staging/stg__activities_points.sql

activities_filled_in_ts as (
    select
        a.id,
        e.end_time_ts,
        generate_array(
            a.start_time_ts - {{ var("peak_time_ranges") | max }}, e.end_time_ts
        )
            as ts_array
    from activities_arrays as a
    inner join activities_end_times as e
        on a.id = e.id
),
activities_filled_in_ts_unnested as (
    select
        id,
        end_time_ts,
        ts
    from activities_filled_in_ts,
        unnest(ts_array) as ts
),

I unnest the arrays of activity points:

-- models/staging/stg__activities_points.sql

activities_unnested as (
    select
        id,
        start_time_ts,
        start_time_utc,
        cast(ts_value as int64) as ts,
        cast(watts_value as int64) as watts,
        cast(heart_rate_value as int64) as heart_rate
    from
        activities_arrays,
        unnest(ts_array) as ts_value with offset as ts_offset,
        unnest(watts_array) as watts_value with offset as watts_offset,
        unnest(heart_rate_array) as heart_rate_value with offset
            as heart_rate_offset
    where
        ts_offset = watts_offset
        and ts_offset = heart_rate_offset
),

And finally I join the unnested activity points along my activity timestamp spine:

-- models/staging/stg__activities_points.sql

joined as (
    select
        f.id,
        f.end_time_ts,
        f.ts,
        max(a.start_time_ts) over (partition by f.id) as start_time_ts,
        max(a.start_time_utc) over (partition by f.id) as start_time_utc,
        coalesce(a.watts, 0) as watts,
        coalesce(a.heart_rate, 0) as heart_rate
    from activities_filled_in_ts_unnested as f
    left join activities_unnested as a
        on
            f.id = a.id
            and f.ts = a.ts
),

So now we have a second-by-second list of points per bike ride, with columns for time, watts and heart rate. Nice! Let's move on to turning this into some metrics.

Using macros to write less SQL

A set of metrics I like to track to see if I'm actually getting faster is whether my average power at varying durations is increasing. For example, what is the highest average power I've held for 1 minute? Depending on what I'm training for, I may be more interested in shorter durations (as short as 5 or 15 sec) or longer durations (20 minutes or 1 hour). There are many other metrics out there. These are just a few I happen to care about the most as it measures outcomes. I could certainly measure some of the process metrics like hours, kilojoules, etc...

To get my peak rolling averages, I need to:

  1. Calculate the rolling averages at varying durations that I care about.
  2. Get the peak average per duration, per activity.

Thanks to the macro get_rolling_averages, my model looks as simple as this:

-- models/staging/stg__activities_peaks.sql

activities_rolling_averages as (
	select
		activities_points.*,
		{{ get_rolling_averages(
							column="watts", 
							time_ranges=var("peak_time_ranges"), 
							partition_by_columns="id", 
							order_by_columns="ts")
		}}
	from activities_points
),

Let me break it down:

get_rolling_averages calculates rolling averages of a given column, over time_ranges, partitioned by a list of columns, and ordered by some more column(s). Maybe I didn't need to explain that, but I just did.

-- macros/get_rolling_averages.sql

{% macro get_rolling_averages(column, time_ranges, partition_by_columns="id", order_by_columns="ts") -%}

{%- for time_range in time_ranges %}
            avg({{ column }}) over (
                        partition by {{ partition_by_columns }} 
                        order by {{ order_by_columns }} 
                        range between {{ time_range }} preceding and current row
                    ) as rolling_{{ get_duration_label(time_range) }}_{{ column }}
            {%- if not loop.last -%}
                ,
            {%- endif -%}
{%- endfor -%}
{% endmacro %}

var("peak_time_ranges") is defined in vars in my dbt_project.yml.

vars:
  peak_time_ranges: [5, 15, 30, 60, 300, 600, 1200, 3600]
  training_start_date: "2014-01-01"
  lookback_windows: [14, 28, 84]

Then we get the peak per duration, in a similar fashion, using another macro:

 activities_peaks as (
        select 
            id,
            start_time_ts,
            start_time_utc,
            end_time_ts,
            {{
	            get_peak_rolling_averages(
		            column="watts", 
		            time_ranges=var("peak_time_ranges"))
		    }}
        from activities_rolling_averages
        group by 
            id,
            start_time_ts,
            start_time_utc,
            end_time_ts
    ),

Here's the corresponding macro, which uses its own little macro to get the more appropriate label for the duration (e.g. use "1 hour" instead of "60 minutes" or "5 minutes" instead of "300 seconds", etc...)

-- macros/get_peak_rolling_averages.sql

{% macro get_peak_rolling_averages(column, time_ranges) -%}

{%- for time_range in time_ranges %}
            max(rolling_{{ get_duration_label(time_range) }}_{{ column }}) as peak_{{ get_duration_label(time_range) }}_{{ column }}
            {%- if not loop.last -%}
                ,
            {%- endif -%}
{%- endfor -%}
{% endmacro %}

We end up with a model that looks something like this at this point:

id start_time_utc peak_5sec_power peak_15sec_power ...
1 2018-12-23 12:43:21 UTC 1092.16 800.87 ...

Reporting layer

In order to track my progress, I don't really care about what happened in a particular ride. What I want to see if in my recent past, I've been getting faster or not as compared to my longer-term history, or at least at this point at the same time last year (or multiple seasons back).

For my one and only report at this time, I want to get a rolling max of my various peak power at the various durations. Yes: a rolling max of my rolling average powers.

I could have used dbt's semantic layer approach for this, but if I did that then I wouldn't get to write another post about how I'm refactoring my current approach to that approach, would I? So we're going to do this old school for now.

I again don't want to write up a lot of SQL that I might have to maintain, so I'm using macros any time I might have to write out a long list of anything. Here is the one and only report table, in its entirety:

-- models/marts/reports/rpt__daily_rolling_peaks.sql

{{ config(materialized="table") }}

{% set peak_columns = dbt_utils.star(
                            from=ref("stg__activities_peaks"), 
                            except=["id",
                                    "start_time_ts",
                                    "start_time_utc", 
                                    "end_time_ts"]).split(",")
%}
{% set lookbacks_columns = get_max_over_lookbacks_columns(
	                            columns=peak_columns, 
	                            lookback_windows=var("lookback_windows")
	                        )
%}

with
    activities_peaks as (select * from {{ ref("stg__activities_peaks") }}),

    dates as (select * from {{ ref("util__dates") }}),

    maxes_per_day as (
        select 
            dates.date_day,
            {{
                get_max_over_lookbacks(
                    columns=peak_columns, 
                    order_by_columns="unix_date(cast(dates.date_day as date))",
                    lookback_windows=var("lookback_windows")
                )
            }}
        from dates
        left join activities_peaks a 
            on cast(a.start_time_utc as date) = dates.date_day
        where
            dates.date_day between cast(
                '{{ var("training_start_date") }}' as datetime
            ) and cast(current_date() as datetime)
    ),
    
    final as (
        select
            date_day,
            {{ get_max(lookbacks_columns) }}
        from maxes_per_day
        group by date_day
    )

select *
from final

peak_columns is generating my list of columns that look like peak_x_watts (there are a variable number of them).

lookbacks_columns is generating the list of columns that will exist in the maxes_per_day so I don't have to write them out by hand.

Visualizing in hex

I have a lot of experience using Looker, but since this project is personal, very small and there's nothing that Looker does that I really need, I decided to try out hex for visualization. It was a very smooth experience getting started and creating my first publicly-visible report.

See the dashboard

Deploying dbt with Docker

After having gone through the fun of getting Docker to do what I wanted with my python script, setting up again for dbt was not a major hassle. Here is the only special part of my Dockerfile worth mentioning:

ARG env
ARG full_refresh=0
ENV FULL_REFRESH=$full_refresh

CMD ["sh", "-c", "if [ \"$FULL_REFRESH\" = \"1\" ]; then dbt run --target $env --full-refresh; else dbt run --target $env; fi"]

Here, I can force a full refresh or target a specific environment so that my Docker container can be used in various situations and not only in production.

---
Last update: Mar 4, 2024
Privacy