Skip to content

Embucket/test-dbt-snowplow-web

Repository files navigation

test-dbt-snowplow-web

Test harness that runs the official dbt-snowplow-web package against Rustice — a Snowflake-protocol query engine — using the dbt-snowflake adapter.

It simulates a Snowplow RDB loader by ingesting one parquet object per cycle from the public bucket s3://embucket-testdata/. The default input is a single post-RDB-loader-shaped parquet object at dbt_snowplow_data/day_2022-08-21.parquet. It is small enough for SPCS smoke runs and still contains the web page context needed for non-empty Snowplow page view/session/user models.

Prerequisites

  1. Rustice running on localhost:3000:
    docker run --name rustice --rm -p 3000:3000 \
      -e CATALOG_URL="s3://mybucket/" \
      -e AWS_REGION=us-east-2 \
      embucket/rustice
    
  2. Python venv with the pinned deps:
    python3 -m venv .venv && source .venv/bin/activate
    pip install -r requirements.txt
    
  3. Environment variables:
    cp .env.example .env && source .env
    

Quick start

./make.sh bootstrap     # one time: creates DB/schema/events table, resets state
./make.sh cycle         # tick 1: load oldest unprocessed parquet, dbt run (full build)
./make.sh cycle         # tick 2: load next parquet, dbt run (incremental only)
./make.sh cycle         # tick N: ...

cycle accepts an optional batch size:

./make.sh cycle 10      # COPY INTO 10 parquet files in order, then a single dbt run

For backfills, prefer cycle N over cycle in a shell loop — N COPY INTOs share one dbt invocation, which is what dbt's incremental machinery is built for. State advances after each individual file, so a mid-batch crash leaves state.json coherent.

loader/state.json tracks the last loaded S3 key. When the bucket has no new files, load-next exits 0 with a "nothing to do" message and cycle becomes idempotent. If cycle N runs out of candidates partway through, it loads what's available, prints loaded K of N requested; no more files, and still runs dbt.

Copy/paste runbooks

The harness supports three modes:

  • Local Rustice container is the default mode. dbt talks to http://127.0.0.1:3000 through the normal Snowflake connector settings in profiles.yml.
  • SPCS mode is opt-in with EMBUCKET_SPCS=1. dbt still uses the normal dbt-snowflake adapter, but sitecustomize.py patches the Snowflake connector to get short-lived SPCS ingress tokens from a regular Snowflake CLI profile.
  • Real Snowflake mode is opt-in with DBT_TARGET=snowflake. dbt and the loader scripts point at a real Snowflake account using username/password auth from env vars — see Running against real Snowflake.

1. Local Rustice container + dbt

In one terminal, start Rustice:

cd /path/to/test-dbt-snowplow-web
mkdir -p data/catalog

docker run --name rustice --rm -p 3000:3000 \
  -v "$(pwd)/data/catalog:/catalog:rw" \
  -e CATALOG_URL=file:///catalog \
  -e AWS_REGION=us-east-2 \
  embucket/rustice:latest

In another terminal, run the Snowplow harness:

cd /path/to/test-dbt-snowplow-web
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

cp .env.example .env
source .env

./make.sh bootstrap
./make.sh cycle --full-refresh
./make.sh cycle

Smoke query through Embucket CLI:

embucket-snow --config-file config.toml \
  sql -c embucket \
  -q "SELECT COUNT(*) FROM rustice_spcs.public_snowplow_manifest.events"

2. Deploy Rustice to SPCS

Run this from the Rustice repo. Use the same Snowflake CLI profile that has permission to create the SPCS objects and grant the service role:

Create the Snowflake-managed Iceberg database/schemas once before the first deploy. The deploy script sets the required schema defaults, but it does not create arbitrary Horizon/dbt schemas for the workload:

snow --config-file /path/to/.snowflake/config.toml sql -c snowflake -q "
CREATE DATABASE IF NOT EXISTS RUSTICE_SPCS;
CREATE SCHEMA IF NOT EXISTS RUSTICE_SPCS.\"public_snowplow_manifest\";
CREATE SCHEMA IF NOT EXISTS RUSTICE_SPCS.\"public_snowplow_manifest_derived\";
CREATE SCHEMA IF NOT EXISTS RUSTICE_SPCS.\"public_snowplow_manifest_scratch\";
CREATE SCHEMA IF NOT EXISTS RUSTICE_SPCS.\"public_snowplow_manifest_snowplow_manifest\";
"
cd /path/to/rustice

SNOW_CONFIG_FILE=/path/to/.snowflake/config.toml \
SNOW_CONNECTION=snowflake \
RUSTICE_HORIZON_DATABASE=RUSTICE_SPCS \
RUSTICE_HORIZON_ROLE=<role-with-access-to-rustice-spcs> \
RUSTICE_GRANT_TO_ROLE=<role-used-by-snowflake-profile> \
RUSTICE_HORIZON_SCHEMAS=public_snowplow_manifest,public_snowplow_manifest_derived,public_snowplow_manifest_scratch,public_snowplow_manifest_snowplow_manifest \
RUSTICE_EGRESS_HOSTS="<account-identifier>.snowflakecomputing.com,s3.us-east-2.amazonaws.com,embucket-testdata.s3.us-east-2.amazonaws.com" \
RUSTICE_S3_REGION=us-east-2 \
RUSTICE_CLIENT_DATABASE=rustice_spcs \
RUSTICE_CLIENT_SCHEMA=public_snowplow_manifest \
RUSTICE_IMAGE_TAG=latest \
./deploy/spcs/deploy.sh

The script prints the service ingress host and writes deploy/spcs/generated/config.toml with an embucket_spcs profile.

3. Smoke checks through Snowflake CLI and Embucket CLI

Use regular Snowflake CLI to inspect the SPCS service and Snowflake-managed Iceberg objects:

snow --config-file /path/to/.snowflake/config.toml sql -c snowflake \
  -q "SELECT SYSTEM\$GET_SERVICE_STATUS('RUSTICE_APP.PUBLIC.RUSTICE_SERVICE')"

snow --config-file /path/to/.snowflake/config.toml sql -c snowflake \
  -q "SHOW SERVICE CONTAINERS IN SERVICE RUSTICE_APP.PUBLIC.RUSTICE_SERVICE"

Use Embucket CLI to query Rustice through the SPCS ingress:

embucket-snow --config-file /path/to/rustice/deploy/spcs/generated/config.toml \
  sql -c embucket_spcs \
  -q "SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()"

After the dbt bootstrap/load step creates the events table, both clients can inspect it:

embucket-snow --config-file /path/to/rustice/deploy/spcs/generated/config.toml \
  sql -c embucket_spcs \
  -q "SELECT COUNT(*) FROM rustice_spcs.public_snowplow_manifest.events"

snow --config-file /path/to/.snowflake/config.toml sql -c snowflake \
  -q "SELECT COUNT(*) FROM RUSTICE_SPCS.\"public_snowplow_manifest\".\"events\""

4. dbt through SPCS

Point the harness at the generated SPCS ingress profile. EMBUCKET_HOST is the host value from /path/to/rustice/deploy/spcs/generated/config.toml:

cd /path/to/test-dbt-snowplow-web
source .venv/bin/activate

export EMBUCKET_SPCS=1
export EMBUCKET_HOST="<service>.snowflakecomputing.app"
export EMBUCKET_PORT=443
export EMBUCKET_PROTOCOL=https
export EMBUCKET_ACCOUNT=embucket
export EMBUCKET_USER=embucket
export EMBUCKET_PASSWORD=embucket
export EMBUCKET_ROLE=SYSADMIN
export EMBUCKET_DATABASE=rustice_spcs
export EMBUCKET_WAREHOUSE=embucket
export EMBUCKET_SCHEMA=public_snowplow_manifest
export EMBUCKET_THREADS=1
export EMBUCKET_SPCS_TOKEN_CONNECTION=snowflake
export EMBUCKET_SPCS_TOKEN_CONFIG_FILE=/path/to/.snowflake/config.toml
export S3_BUCKET=embucket-testdata
export S3_PREFIX=dbt_snowplow_data/day_2022-08-21.parquet
export S3_REGION=us-east-2
export SNOWPLOW_START_DATE=2022-08-19
export SNOWPLOW_ENABLE_LOAD_TSTAMP=false
export DBT_PROFILES_DIR="$(pwd)"

./make.sh bootstrap
./make.sh cycle --full-refresh
./make.sh cycle

SPCS mode

When Rustice runs behind Snowpark Container Services public ingress, dbt still uses the normal dbt-snowflake adapter, but Python must patch snowflake-connector-python before dbt opens a connection. This repository does that automatically when EMBUCKET_SPCS=1 is set: make.sh adds the repo root to PYTHONPATH, sitecustomize.py installs the Embucket SPCS connector patch, and both dbt and the loader send the standard SPCS ingress header:

Authorization: Snowflake Token="<short-lived-spcs-token>"

Deploy Rustice to SPCS first. For this workload, include all schemas that dbt creates so the Snowflake-managed Iceberg defaults can be configured before dbt starts creating tables:

cd /home/artem/work/reps/github.com/embucket/rustice

SNOW_CONFIG_FILE=/home/artem/.snowflake/config.toml \
SNOW_CONNECTION=snowflake \
RUSTICE_HORIZON_DATABASE=RUSTICE_SPCS \
RUSTICE_HORIZON_ROLE=<role-with-access-to-rustice-spcs> \
RUSTICE_GRANT_TO_ROLE=<role-used-by-snowflake-profile> \
RUSTICE_HORIZON_SCHEMAS=public_snowplow_manifest,public_snowplow_manifest_derived,public_snowplow_manifest_scratch,public_snowplow_manifest_snowplow_manifest \
RUSTICE_EGRESS_HOSTS="<account-identifier>.snowflakecomputing.com,s3.us-east-2.amazonaws.com,embucket-testdata.s3.us-east-2.amazonaws.com" \
RUSTICE_S3_REGION=us-east-2 \
RUSTICE_CLIENT_DATABASE=rustice_spcs \
RUSTICE_CLIENT_SCHEMA=public_snowplow_manifest \
RUSTICE_IMAGE_TAG=latest \
./deploy/spcs/deploy.sh

<account-identifier> is the lower-case Snowflake account identifier used by the deploy script for Horizon, for example <org>-<account>.snowflakecomputing.com. The extra embucket-testdata.s3.us-east-2.amazonaws.com host is needed for this harness because the Snowplow test data lives in S3 us-east-2.

If the Snowplow schemas do not exist yet in the Horizon database, create them once from regular Snowflake SQL before running the deploy script so the deploy script's ALTER SCHEMA ... SET EXTERNAL_VOLUME = 'SNOWFLAKE_MANAGED' statements have something to update:

snow --config-file /home/artem/.snowflake/config.toml sql -c snowflake -q "
CREATE DATABASE IF NOT EXISTS RUSTICE_SPCS;
CREATE SCHEMA IF NOT EXISTS RUSTICE_SPCS."public_snowplow_manifest";
CREATE SCHEMA IF NOT EXISTS RUSTICE_SPCS."public_snowplow_manifest_derived";
CREATE SCHEMA IF NOT EXISTS RUSTICE_SPCS."public_snowplow_manifest_scratch";
CREATE SCHEMA IF NOT EXISTS RUSTICE_SPCS."public_snowplow_manifest_snowplow_manifest";
"

The schemas are quoted intentionally. Rustice normalizes unquoted SQL identifiers to lowercase, while Snowflake stores unquoted schema names in uppercase.

The deploy script writes an Embucket CLI config similar to:

[connections.embucket_spcs]
host = "<service>.snowflakecomputing.app"
protocol = "https"
port = 443
spcs_token_connection = "snowflake"
spcs_token_config_file = "/home/artem/.snowflake/config.toml"

Export matching env vars for dbt. The token source is a normal Snowflake CLI profile; the token is issued in memory and no daily token file is needed:

cd /home/artem/work/reps/github.com/embucket/test-dbt-snowplow-web
source .venv/bin/activate

export EMBUCKET_SPCS=1
export EMBUCKET_HOST="<service>.snowflakecomputing.app"
export EMBUCKET_PORT=443
export EMBUCKET_PROTOCOL=https
export EMBUCKET_ACCOUNT=embucket
export EMBUCKET_USER=embucket
export EMBUCKET_PASSWORD=embucket
export EMBUCKET_ROLE=SYSADMIN
export EMBUCKET_DATABASE=rustice_spcs
export EMBUCKET_WAREHOUSE=embucket
export EMBUCKET_SCHEMA=public_snowplow_manifest
export EMBUCKET_THREADS=1
export EMBUCKET_SPCS_TOKEN_CONNECTION=snowflake
export EMBUCKET_SPCS_TOKEN_CONFIG_FILE=/home/artem/.snowflake/config.toml
export S3_BUCKET=embucket-testdata
export S3_PREFIX=dbt_snowplow_data/day_2022-08-21.parquet
export S3_REGION=us-east-2
export SNOWPLOW_START_DATE=2022-08-19
export SNOWPLOW_ENABLE_LOAD_TSTAMP=false
export DBT_PROFILES_DIR="$(pwd)"

EMBUCKET_THREADS=1 is the current SPCS smoke-test default. It keeps dbt from opening several Snowflake-compatible sessions at the same time while the SPCS ingress token/session behavior is still being hardened.

Then run the same harness:

./make.sh bootstrap
./make.sh cycle --full-refresh
./make.sh cycle

Use embucket-snow or regular Snowflake SQL to inspect status:

embucket-snow --config-file ../rustice/deploy/spcs/generated/config.toml \
  sql -c embucket_spcs \
  -q "SELECT COUNT(*) FROM rustice_spcs.public_snowplow_manifest.events"

snow --config-file /home/artem/.snowflake/config.toml sql -c snowflake \
  -q "SHOW SERVICE CONTAINERS IN SERVICE RUSTICE_APP.PUBLIC.RUSTICE_SERVICE"

For local iteration without S3 / network, see Dev mode.

Running against real Snowflake

The same dbt project and loader scripts also run against a real Snowflake account. Switch by exporting DBT_TARGET=snowflake and the SNOWFLAKE_* credentials — the rustice (Embucket) target stays the default, so existing flows are unaffected.

Prerequisites

  • A Snowflake account with a user that can authenticate via password.
  • An existing database and warehouse. The loader's bootstrap.py only creates the schema, the events table, and an external stage. It does not create the database or warehouse (those typically need ACCOUNTADMIN/SYSADMIN grants that are out of scope here).
  • Your role needs CREATE SCHEMA on the database, plus USAGE on the warehouse. The external stage points at the public bucket s3://embucket-testdata/, so no AWS credentials or storage integration are required.

Setup

cp .env.example .env
# Uncomment and fill in the "Real Snowflake mode" block in .env:
#   export DBT_TARGET=snowflake
#   export SNOWFLAKE_ACCOUNT=...
#   export SNOWFLAKE_USER=...
#   export SNOWFLAKE_PASSWORD=...
#   export SNOWFLAKE_ROLE=SYSADMIN
#   export SNOWFLAKE_DATABASE=...
#   export SNOWFLAKE_WAREHOUSE=...
#   export SNOWFLAKE_SCHEMA=snowplow_manifest
source .env

SNOWFLAKE_PASSWORD is read from the environment by both profiles.yml (via dbt's env_var()) and the loader (loader/_connection.py). Keep .env out of version control — .gitignore already excludes it.

Bootstrap and run

./make.sh bootstrap     # creates schema + events table + external stage
./make.sh cycle         # tick 1: COPY INTO from @snowplow_input_stage, dbt run
./make.sh cycle         # tick 2: next parquet, incremental dbt run

dbt and the loader both honour DBT_TARGET, so dbt run, dbt seed, and dbt test invoked from make.sh dbt-run automatically use the snowflake target in profiles.yml.

Notes

  • Stricter column matching. Real Snowflake enforces MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE strictly; any column drift between the input parquet and events_ddl.sql will fail the COPY INTO loudly, whereas Embucket may have been more permissive.
  • Tearing down. ./make.sh reset drops <schema>, <schema>_derived, <schema>_scratch, <schema>_snowplow_manifest, and the snowplow_input_stage. The database and warehouse are left alone.

Recording reference results for rustice SLT tests

rustice/crates/sqllogictest/tests/slt/dbt_snowplow_web/ contains 36 SLT files (18 incremental + 18 full_refresh) whose verification blocks are filled with TODO placeholders. The workflow below runs the dbt-snowplow-web DAG on real Snowflake against a fixed 2-file parquet fixture and emits the captured query results as paste-ready blocks.

./make.sh bootstrap     # schema + events table + external stage
./make.sh load-test     # TRUNCATE + COPY both snowplow/test/events*.parquet
./make.sh dbt-run       # cold-table run covers both modes
./make.sh capture-slt   # writes loader/slt_results.txt (72 blocks)

Open loader/slt_results.txt and paste each === <subdir>/<file> === block over the corresponding # TODO(snowflake-verify) block in the rustice file. The recorder writes each result twice — once for incremental/ and once for full_refresh/ — because dbt's first run on a cold table is identical between the two materialisation modes.

make.sh targets

Target Action
bootstrap Create database, schema, events table, fresh state.json
deps dbt deps (installs snowplow_web from Hub)
load-next [N] COPY INTO events from the next N parquet files on S3 (or local fs in dev mode); default 1
load-test TRUNCATE events + COPY both snowplow/test/events*.parquet for SLT regeneration (Snowflake only)
dbt-run dbt deps + dbt seed --full-refresh + dbt run + dbt test
cycle [N] load-next N then dbt-run — one simulated loader tick over N files; default 1
capture-slt Run the rustice dbt_snowplow_web TODO verification queries and write loader/slt_results.txt
reset Drop derived/scratch/manifest schemas, rewind state to null
clean Remove target/ and dbt_packages/

To force a full rebuild of every model (e.g. after a reset), pass --full-refresh through to dbt:

./make.sh dbt-run --full-refresh

Do not invoke dbt run --full-refresh directly — dbt run does not materialize seeds, so the snowplow_web package's *_dim_* seeds (e.g. snowplow_web_dim_ga4_source_categories) will be missing and downstream models will fail with table … not found. The dbt-run target wraps dbt seed --full-refresh + dbt run together so the seeds always exist.

How loading works

loader/load_next.py:

  1. Lists s3://$S3_BUCKET/$S3_PREFIX anonymously via boto3 (UNSIGNED).

  2. Picks the smallest *.parquet key strictly greater than state.last_loaded_key.

  3. Issues, against Rustice via the Snowflake protocol:

    COPY INTO public_snowplow_manifest.events
    FROM 's3://embucket-testdata/<key>'
    FILE_FORMAT = (TYPE = PARQUET)
    MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

No storage integration / external volume is created — Rustice reads the public bucket anonymously. If the COPY fails, fix Rustice's S3 client config; do not fall back to local staging.

Schema reconciliation

The default dbt_snowplow_data/day_2022-08-21.parquet object mirrors the post-RDB-loader events layout closely enough for the core web package. It contains CONTEXTS_COM_SNOWPLOWANALYTICS_SNOWPLOW_WEB_PAGE_1, the split-out web page context that dbt uses to derive page_view_id.

MATCH_BY_COLUMN_NAME reconciles the two:

  • Web page context split-out (contexts_com_snowplowanalytics_snowplow_web_page_1) is loaded from the fixture and keeps page_view_id non-null for page view and page ping events.
  • Optional split-out columns (load_tstamp, IAB / UA / YAUAA / consent / CWV contexts) stay NULL because the optional modules that read them are disabled in dbt_project.yml.

Because the remaining split-out columns stay NULL, the corresponding optional features of the dbt-snowplow-web package would produce empty or errorful output. dbt_project.yml disables them:

  • snowplow__enable_consent: false
  • snowplow__enable_cwv: false
  • snowplow__enable_iab: false
  • snowplow__enable_ua: false
  • snowplow__enable_yauaa: false
  • snowplow__enable_load_tstamp: false

The harness exercises the core models — snowplow_web_sessions, snowplow_web_page_views, snowplow_web_users, and the manifest / incremental machinery. That's the surface most relevant to validating Rustice's Snowflake compatibility.

How dbt-snowplow-web is wired

  • Installed from dbt Hub via packages.yml (snowplow/snowplow_web).
  • Configured via vars.snowplow_web.* in dbt_project.yml.
  • The atomic events source is declared in models/sources.yml as source('atomic', 'events'), mapped to $EMBUCKET_DATABASE.$EMBUCKET_SCHEMA.events via snowplow__atomic_schema / snowplow__events.

Watch out: SNOWPLOW_START_DATE must be close enough to the earliest session timestamp in the first parquet you load for the package's backfill window to include data. The default test object starts on 2022-08-21, and the default SNOWPLOW_START_DATE is 2022-08-19.

Verifying

After ./make.sh cycle:

SELECT COUNT(*) FROM rustice_spcs.public_snowplow_manifest.events;
SELECT model, last_success
FROM rustice_spcs.public_snowplow_manifest_snowplow_manifest.snowplow_web_incremental_manifest;
SELECT COUNT(*) FROM rustice_spcs.public_snowplow_manifest_derived.snowplow_web_page_views;
SELECT COUNT(*) FROM rustice_spcs.public_snowplow_manifest_derived.snowplow_web_sessions;
SELECT COUNT(*) FROM rustice_spcs.public_snowplow_manifest_derived.snowplow_web_users;

events row count should grow by exactly the loaded file's row count each tick; last_success per model should advance after every successful tick. With the default fixture, expect events = 85, snowplow_web_sessions = 11, snowplow_web_page_views = 8, and snowplow_web_users = 11.

Query snapshots

queries/full-refresh/ and queries/incremental/ hold the SELECT bodies of every dbt-snowplow-web model, as compiled by dbt — one tree per mode. They mirror the models/ subtree under target/compiled/snowplow_web/, so paths correspond 1:1 between the two trees and diff -r between them highlights incremental-only branches.

These are SELECTs only — no CREATE TABLE AS wrapper, no MERGE/INSERT glue. Dropping any of them into a CREATE TABLE … AS (…) is enough to materialize that model.

To regenerate (Rustice in dev mode):

# clean state — drop all snowplow_web tables (CASCADE doesn't drop tables in
# Rustice, so each table must go individually)
./make.sh bootstrap                             # fresh-install state
dbt compile                                     # → full-refresh mode
# copy target/compiled/snowplow_web/models/**/*.sql (excluding *.yml/ test dirs)
# into queries/full-refresh/

./make.sh cycle                                 # populate manifest + derived tables
dbt compile                                     # → incremental mode
# same copy into queries/incremental/

The --full-refresh flag does not change the compiled SELECT — only the manifest state at compile time does. Compiling against an empty manifest yields the full-refresh form (lower bound = snowplow__start_date); compiling against a populated manifest yields the incremental form (lower bound = last_success minus the configured lookback window).

The diff between the two trees is concentrated in base/scratch/snowplow_web_base_new_event_limits.sql (where the lookback math lives) and the three base models that copy the bounds into their own SELECT (snowplow_web_base_sessions_lifecycle_manifest, snowplow_web_base_events_this_run, snowplow_web_base_sessions_this_run). Downstream models reference the limits table at runtime instead of baking the bounds in, so they're byte-identical between trees. Resolved table names and date constants are baked in, so these are environment-specific snapshots, not portable scripts.

Dev mode (local filesystem)

For local iteration without S3 / network, set DEV=1 and the loader reads parquet from $LOCAL_PARQUET_DIR (default ./data/snowplow/) and issues COPY INTO ... FROM 'file:///...' instead of s3://.... How parquet files land in that directory is out of scope for this harness — assume they're already there.

Start Rustice with its iceberg-file-catalog rooted under ./data/catalog/, and bind-mount ./data into the container at the same host path so the absolute file:// URLs the loader emits resolve identically inside the container:

mkdir -p data/snowplow data/catalog
docker run --name rustice --rm -p 3000:3000 \
  -v "$(pwd)/data:$(pwd)/data:rw" \
  -e CATALOG_URL="file://$(pwd)/data/catalog" \
  -e BUCKET_HOST=0.0.0.0 \
  embucket/rustice

CATALOG_URL with a file: scheme switches Rustice into dev-catalog mode, which is what enables COPY INTO ... FROM 'file://...'. The single data/ mount holds both the parquet inputs (data/snowplow/) and Rustice's iceberg-catalog metadata (data/catalog/).

Uncomment the DEV=1 / LOCAL_PARQUET_DIR lines in .env, then:

source .env
./make.sh bootstrap
./make.sh cycle

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors