Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions experimental/databricks-spark-structured-streaming/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ df.writeStream \
| Pattern | Description | Reference |
|---------|-------------|-----------|
| **Kafka Streaming** | Kafka to Delta, Kafka to Kafka, Real-Time Mode | See [references/kafka-streaming.md](references/kafka-streaming.md) |
| **Real-Time Mode (RTM)** | Sub-second E2E latency — cluster setup, slot math, supported ops, `transformWithState`, observability, delivery semantics | See [references/real-time-mode.md](references/real-time-mode.md) |
| **Lakebase `foreach` Sink** | Write streaming records into Lakebase Postgres with transactional upserts (the canonical sink for an RTM realtime app) | See [references/lakebase-sink-python.md](references/lakebase-sink-python.md) |
| **Stream Joins** | Stream-stream joins, stream-static joins | See [references/stream-stream-joins.md](references/stream-stream-joins.md), [references/stream-static-joins.md](references/stream-static-joins.md) |
| **Multi-Sink Writes** | Write to multiple tables, parallel merges | See [references/multi-sink-writes.md](references/multi-sink-writes.md) |
| **Merge Operations** | MERGE performance, parallel merges, optimizations | See [references/merge-operations.md](references/merge-operations.md) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,14 @@ query = (enriched_df
Enrich events with dimension data:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we recommend RTM for kafka->kafka streams? I think it's better than microbatch for this case.


```python
from pyspark.sql.functions import broadcast, col, to_json, struct

# Read reference data (Delta table - auto-refreshed each microbatch)
user_dim = spark.table("users.dimension")

# Stream-static join for enrichment
enriched = (parsed_df
.join(user_dim, "user_id", "left")
.join(broadcast(user_dim), "user_id", "left") # broadcast() required: RTM only supports broadcast stream-static joins
.withColumn("enriched_value", to_json(struct(
col("event_id"),
col("user_id"),
Expand All @@ -192,7 +194,8 @@ enriched.select(col("key"), col("enriched_value").alias("value")).writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("topic", "enriched-events") \
.trigger(realTime=True) \
.outputMode("update") \
.trigger(realTime="5 minutes") \
.option("checkpointLocation", "/checkpoints/enrichment") \
.start()
```
Expand Down Expand Up @@ -231,8 +234,8 @@ def route_events(batch_df, batch_id):
.save()

parsed_df.writeStream \
.foreachBatch(route_events) \
.trigger(realTime=True) \
.foreachBatch(route_events) \ # foreachBatch is NOT supported in RTM — use processingTime

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is probably not necessary here, although it's useful for the reviewer. :)

.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/checkpoints/routing") \
.start()
```
Expand Down Expand Up @@ -281,8 +284,8 @@ def validate_and_route(batch_df, batch_id):
.save()

source_df.writeStream \
.foreachBatch(validate_and_route) \
.trigger(realTime=True) \
.foreachBatch(validate_and_route) \ # foreachBatch is NOT supported in RTM — use processingTime

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Comment not necessary.

.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/checkpoints/validation") \
.start()
```
Expand Down Expand Up @@ -352,7 +355,7 @@ df.writeStream \
| minPartitions | Match Kafka partitions | Optimal parallelism |
| maxOffsetsPerTrigger | 10,000-100,000 | Balance latency vs throughput |
| trigger interval | Business SLA / 3 | Recovery time buffer |
| RTM | Only if < 800ms required | Microbatch more cost-effective |
| RTM | When sub-second E2E latency is required | Micro-batch is more cost-effective for second-or-longer SLAs |

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend RTM for kafka -> kafka?


## Monitoring

Expand Down Expand Up @@ -391,7 +394,7 @@ for stream in spark.streams.active:
| Issue | Cause | Solution |
|-------|-------|----------|
| **No data being read** | `startingOffsets` default is "latest" | Use "earliest" for existing data |
| **High latency** | Microbatch overhead | Use RTM (trigger(realTime=True)) |
| **High latency** | Micro-batch overhead | Use RTM (`trigger(realTime="5 minutes")`) — see [real-time-mode.md](real-time-mode.md) |
| **Consumer lag** | Processing < Input rate | Scale cluster; reduce maxOffsetsPerTrigger |
| **Duplicate messages** | Exactly-once not configured | Enable idempotent producer (acks=all) |
| **Falling behind** | Processing < Input rate | Increase cluster size |
Expand All @@ -402,7 +405,7 @@ for stream in spark.streams.active:
- [ ] Checkpoint location is persistent (UC volumes, not DBFS)
- [ ] Unique checkpoint per pipeline
- [ ] Fixed-size cluster (no autoscaling for streaming/RTM)
- [ ] RTM enabled only if latency < 800ms required
- [ ] RTM enabled only when sub-second E2E latency is required

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too strong? Good for kafka -> kafka

- [ ] Consumer lag monitored and alerts configured
- [ ] Producer acks=all for durability
- [ ] Schema validation with DLQ configured
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
---
name: lakebase-sink-python
description: Write streaming records into Lakebase Postgres from RTM (or any Structured Streaming pipeline) using a Python foreach sink. Use when building realtime apps that serve from Lakebase, or any low-latency pipeline that needs transactional upserts into Postgres.
---

# Lakebase `foreach` Sink (Python)

A Python `foreach` sink that writes streaming records into Lakebase Postgres with low latency, transactional upserts, and per-partition connection lifecycle. The canonical sink shape for an RTM streaming app whose serving layer is Lakebase.

For general Lakebase mechanics — projects, branches, sizing, the CU-to-connections cap, authentication methods, connection patterns from non-streaming clients — see the stable `databricks-lakebase` skill, specifically [connectivity.md](../../../skills/databricks-lakebase/references/connectivity.md) and [computes-and-scaling.md](../../../skills/databricks-lakebase/references/computes-and-scaling.md). This file covers only the streaming-sink-specific pattern.

## Why this pattern (and not the alternatives)

There is no native Structured Streaming connector for Lakebase / Postgres in low-latency mode. The available alternatives:

- **`foreachBatch + df.write.jdbc()`** — NOT appropriate for RTM. `foreachBatch` is unsupported in RTM, and batches by micro-batch boundary, which RTM doesn't really have.
- **`foreach` with a custom Python sink** — the right primitive. Per-row lifecycle, per-partition connection.

## Two non-obvious rules

1. **The sink class is duck-typed in Python — no base class.** Official Databricks docs show the foreach pattern as a Scala `ForeachWriter` subclass. Translate to Python — `foreach()` accepts duck-typed objects with `open`/`process`/`close` methods, so no base class is involved. The `pyspark.sql.streaming.ForeachWriter` symbol is a Scala class with no exported Python equivalent.
2. **For the executor-side sink, use native Postgres password auth, not OAuth.** OAuth refresh requires Databricks SDK context that executors don't have — executors are separate Python processes with no SDK state. Use the Postgres role's password, pulled from a Databricks secret scope at driver startup and serialized into the sink instance. (Driver-side admin setup — `CREATE ROLE`, `CREATE DATABASE`, `GRANT` — is different; that runs on the driver where the SDK is available, so use an SDK-minted JWT for those.) See [connectivity.md](../../../skills/databricks-lakebase/references/connectivity.md) for the two auth methods.

## Prerequisites

- A Lakebase Autoscaling project, branch, and endpoint. Default database is `databricks_postgres`.
- The Lakebase instance must have **native Postgres password login enabled** at create time. Without it, `CREATE ROLE … PASSWORD` succeeds but the Postgres wire-protocol login from executors is rejected.
- A Postgres role with a password (created via driver-side admin connection), and the password stored in a Databricks secret scope.

## The `LakebaseSink` class

```python
import time

import psycopg


# PySpark's `foreach()` accepts any object that implements the
# `open(partition_id, epoch_id) -> bool`, `process(row)`, `close(error)`
# lifecycle. There is no exported base class to extend — the
# `pyspark.sql.streaming.ForeachWriter` symbol is a Scala class, not a
# Python one. Duck-typing is the documented Python pattern.
class LakebaseSink:
"""
Streaming sink that upserts rows into a Lakebase Postgres table.

Buffers in-memory and commits in time-based windows. Each process(row)
appends to the buffer and flushes only if at least max_dwell_ms have
passed since the last flush. Effective Lakebase TX/s ceiling per
partition is 1000 / max_dwell_ms; at higher source rates more rows
accumulate per flush.

Uses INSERT ... ON CONFLICT for upsert semantics. One transaction per
flush. Authenticates with Postgres password — OAuth is incompatible
with executor-side custom sinks.
"""

def __init__(self, host: str, database: str, user: str, password: str,
table: str, port: int = 5432, key_col: str = "id",
max_dwell_ms: int = 10):
self.host = host
self.database = database
self.user = user
self.password = password
self.table = table
self.port = port
self.key_col = key_col
self.max_dwell_ms = max_dwell_ms

def open(self, partition_id, epoch_id):
self.conn = psycopg.connect(
host=self.host, port=self.port, dbname=self.database,
user=self.user, password=self.password,
sslmode="require", autocommit=False,
)
self.buffer = []
self.last_flush_ms = self._now_ms()
return True

def process(self, row):
self.buffer.append(row)
if self._now_ms() - self.last_flush_ms >= self.max_dwell_ms:
self._flush()

def close(self, error):
try:
if self.buffer:
self._flush()
finally:
if self.conn is not None:
self.conn.close()
self.conn = None

def _flush(self):
if not self.buffer:
return
sample = self.buffer[0].asDict()
cols = list(sample.keys())
placeholders = ", ".join(["%s"] * len(cols))
col_list = ", ".join(f'"{c}"' for c in cols)
update_set = ", ".join(
f'"{c}" = EXCLUDED."{c}"' for c in cols if c != self.key_col
)
sql = (
f'INSERT INTO "{self.table}" ({col_list}) '
f'VALUES ({placeholders}) '
f'ON CONFLICT ("{self.key_col}") DO UPDATE SET {update_set}'
)
try:
with self.conn.cursor() as cur:
rows = [tuple(r.asDict()[c] for c in cols) for r in self.buffer]
cur.executemany(sql, rows)
self.conn.commit()
except Exception:
self.conn.rollback()
raise
finally:
self.buffer.clear()
self.last_flush_ms = self._now_ms()

@staticmethod
def _now_ms():
return int(time.monotonic() * 1000)
```

## Wiring into a stream

```python
# Set defensively at the top — required before any .repartition(N) in RTM
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

sink = LakebaseSink(
host=dbutils.secrets.get("lakebase", "host"),
database="databricks_postgres",
user=dbutils.secrets.get("lakebase", "user"),
password=dbutils.secrets.get("lakebase", "password"),
table="events",
key_col="id",
max_dwell_ms=10,
)

stream = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(value AS STRING) AS id", "timestamp")
.writeStream
.foreach(sink)
.outputMode("update")
.option("checkpointLocation", checkpoint_path)
.trigger(realTime="5 minutes") # checkpoint interval, not trigger interval
.start()
)
```

See [real-time-mode.md](real-time-mode.md) for the RTM cluster setup and the `outputMode("update")` requirement; see [kafka-streaming.md](kafka-streaming.md) for source-specific Kafka options.

## Delivery semantics

The `foreach` sink is **at-least-once** — see "Delivery semantics" in [real-time-mode.md](real-time-mode.md) for why and when duplicates occur.

The supplied `LakebaseSink` handles this correctly because the upsert is idempotent — `INSERT … ON CONFLICT (id) DO UPDATE` produces the same final state regardless of how many times the same row arrives. If you customize the sink, **preserve idempotency**: keep the `ON CONFLICT` upsert (or use `MERGE`), never plain `INSERT`. If you swap in append-only inserts (event log style), include a deduplication key the consumer can use.

## Tuning `max_dwell_ms`

`max_dwell_ms = 10` is the default. It controls the minimum interval between Postgres commits.

| `max_dwell_ms` | Latency to Postgres | Max sustained TX/s per partition | When to use |
|---|---|---|---|
| 1 | Lowest (~1 ms) | ~1000 | Tightest demos; cheap workloads only |
| 10 (default) | Low (~10 ms) | ~100 | Most realtime apps |
| 100 | Moderate | ~10 | High source rates where per-commit overhead matters more than per-row latency |
| 1000 | Visible (~1 s) | ~1 | Effectively micro-batch — only if Lakebase is the bottleneck |

The effective Lakebase TX/s ceiling per partition is `1000 / max_dwell_ms`. With N source partitions writing to Lakebase, your cluster can drive up to `N × 1000 / max_dwell_ms` transactions per second.

**Why time-based and not count-based?** A count-based buffer (flush every K rows) has unpredictable latency: at low source rates, K rows can take seconds to accumulate. Time-based dwell bounds worst-case latency directly.

**Caveat:** the dwell check only fires on row arrival. If no rows arrive for a long stretch, the last buffered row stays in memory until either the next row arrives or `close()` is called. Fine for steady streams, less ideal for bursty sources with long quiet gaps.

## Connection budget

Each Lakebase CU has a max-connections cap (see the CU-to-connections table in [computes-and-scaling.md](../../../skills/databricks-lakebase/references/computes-and-scaling.md)). For a realtime app, total Postgres connections at peak =

```
(app's connection pool max × app replicas) + (streaming sink partitions)
```

The streaming sink opens one Postgres connection per partition writing to Lakebase. Both terms must fit under the CU's max-connections cap. At CU_1 the cap is ~209; at CU_4, ~839.

## Target table

Create the Lakebase table once, outside the stream:

```sql
CREATE TABLE IF NOT EXISTS events (
id BIGINT PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL
);

CREATE INDEX IF NOT EXISTS events_ts_idx ON events (timestamp DESC);
```

The primary key is what `ON CONFLICT` upserts against. The descending timestamp index supports the "give me the latest N" query a downstream app will run. Add business columns (metric, category, payload JSON) as your app needs.

## Performance & longevity

The sink performs well for moderate rates and short-lived or append-mostly workloads. The case that needs care is the one it's most often used for — **continuously upserting a small set of hot rows** to serve a live app — where dead-tuple churn can degrade it over a long run.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to discuss the upsert approach that uses Lakebase as a state store, or do you consider that bad practice?


### Dead-tuple bloat on hot-key upserts

`INSERT … ON CONFLICT DO UPDATE` rewrites a row on every flush, and every UPDATE leaves a dead tuple behind. Against a handful of live rows updated several times a second, that's thousands of dead tuples a minute. Autovacuum reclaims them, but its defaults are tuned for large tables, not tiny hot ones (`autovacuum_naptime` is 60s; the scale factor waits for 20% of the table to change). Set aggressive **per-table** autovacuum so cleanup keeps pace:

```sql
ALTER TABLE events SET (
autovacuum_vacuum_scale_factor = 0, -- don't wait for 20% of the table
autovacuum_vacuum_threshold = 200, -- vacuum after N dead tuples
autovacuum_vacuum_cost_delay = 0 -- don't throttle on a hot table
);
```

Watch `n_dead_tup` and `pg_total_relation_size` in `pg_stat_user_tables` — a few-row table holding tens of MB is the symptom, and it slows both this sink's writes and the app's reads.

### Do NOT put Change Data Feed on a high-churn upsert table

Lakehouse Sync / CDF holds a logical **replication slot that pins the xmin horizon**; if the slot lags even slightly, autovacuum can reclaim *nothing* and the upsert table bloats without bound (and `REPLICA IDENTITY FULL`, which CDF requires, doubles the WAL per update). The result is unbounded growth that degrades the whole serving path.

If you need the operational data in Delta, capture it from an **append-only event-log table**, never the upsert table:

- Keep the serving (upsert) tables `REPLICA IDENTITY DEFAULT` and **out of CDF** — autovacuum then reclaims them freely.
- Have the sink *also* insert each row into an append-only `*_events` table and point CDF only at that. Inserts create no dead tuples, so a lagging slot can never bloat it.

For Lakehouse Sync setup itself (CDF from Lakebase → Delta), see [lakehouse-sync.md](../../../skills/databricks-lakebase/references/lakehouse-sync.md).

### Reconnect instead of failing the query

`open()` holds one connection per partition for the epoch, and the default `_flush` re-raises on any error — so a transient disconnect (an autoscaling endpoint bounce, a CDF slot being created, a brief network blip) **kills the whole RTM query** and forces a restart (minutes of downtime plus checkpoint replay). Pull the connect into a helper and reconnect-then-skip instead of letting it propagate:

```python
def _connect(self):
self.conn = psycopg.connect(
host=self.host, port=self.port, dbname=self.database,
user=self.user, password=self.password,
sslmode="require", autocommit=False,
)

# in _flush()'s except block:
except (psycopg.OperationalError, psycopg.InterfaceError):
try:
self.conn.close()
except Exception:
pass
self._connect() # one tick dropped; the stream survives
```

This trades one flush window of buffered rows on each transient blip for stream availability. The strict at-least-once alternative is to keep the default `raise` and let RTM restart from checkpoint — see "Delivery semantics" above.
Loading