Skip to content

skills(experimental): Real-Time Mode bug fixes + two new references#157

Open
DanJKeeling wants to merge 1 commit into
databricks:mainfrom
DanJKeeling:feature/rtm-content
Open

skills(experimental): Real-Time Mode bug fixes + two new references#157
DanJKeeling wants to merge 1 commit into
databricks:mainfrom
DanJKeeling:feature/rtm-content

Conversation

@DanJKeeling

Copy link
Copy Markdown

Summary

Fixes incorrect RTM facts in two existing references and adds two new references covering RTM essentials and the Lakebase foreach sink — the canonical sink shape for a realtime app whose serving layer is Lakebase.

Scope: experimental/databricks-spark-structured-streaming/ only. No changes to skills/.

Bug fixes (existing references)

references/trigger-and-cost-optimization.md

The RTM coverage made several claims that don't match current docs and wouldn't run as written:

  • .trigger(realTime=True) — boolean form doesn't exist in PySpark; the duration string is required
  • "Photon enabled" — Photon must be off for RTM
  • "Databricks 13.3+" — current minimum is DBR 16.4 LTS
  • "Latency: < 800ms" — current docs cite "as low as 5 ms"

Five instances of these claims across the file are corrected, including the Trigger Selection Guide table, the "Real-Time Mode (RTM) Configuration" section, the cost-strategy block, and the "RTM not working" Common Issues row.

references/kafka-streaming.md

Three patterns were broken as written:

  • Pattern 4 (Event Enrichment): used .trigger(realTime=True) (boolean) AND .join(user_dim, ...) without broadcast() — RTM only supports broadcast stream-static joins, so it wouldn't have run under RTM. Fixed to wrap with broadcast(), add outputMode("update"), use the correct realTime="5 minutes" form, and import broadcast in the pattern's code block.
  • Patterns 5 & 6 (Multi-Topic Routing, Schema Validation with DLQ): both used foreachBatch(...) combined with .trigger(realTime=True), directly contradicting the same file's Pattern 3 correct note that "forEachBatch is NOT supported in RTM". Switched both to .trigger(processingTime="30 seconds") (consistent with the file's Quick Start) with an inline comment on the foreachBatch line.

Also fixed: Performance Tuning table row, Common Issues "High latency" row, and Production Checklist row that repeated the < 800ms framing.

New references

references/real-time-mode.md (175 lines)

The must-know RTM facts in one place:

  • Cluster setup constraints (DBR 16.4 LTS+, Classic compute only, autoscaling/Photon/spot OFF, the spark.databricks.streaming.realTimeMode.enabled flag, Dedicated mode for UDFs)
  • Slot math (total worker vCPUs ≥ sum of partitions across all stages; the [CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT] error)
  • Trigger and output mode (the "5 minutes" is checkpoint interval, not trigger interval)
  • Supported sources/sinks matrix
  • transformWithState per-row behavior (vs micro-batch per-key-per-batch) — the most commonly-missed RTM semantic difference
  • Verifying and observing RTM (physical-plan check for RealTimeStreamScan; the three built-in latency metrics with the sink-write caveat; diagnose-by-metric decision tree)
  • Delivery semantics (exactly-once within Spark, at-least-once for Kafka and foreach)
  • Common error class table

references/lakebase-sink-python.md (255 lines)

A duck-typed Python foreach sink that writes streaming records into Lakebase Postgres — the canonical sink shape for an RTM realtime app whose serving layer is Lakebase. Cross-links to the stable databricks-lakebase skill for general Lakebase mechanics (auth methods, CU-to-connections cap, scaling) rather than restating them:

  • Why this pattern vs foreachBatch + df.write.jdbc()
  • Two non-obvious rules: duck-typed class (no ForeachWriter Python base class — that symbol is Scala-only), and native Postgres password auth (not OAuth) on executors
  • Full LakebaseSink class with time-based commits (max_dwell_ms), INSERT … ON CONFLICT idempotent upserts, per-partition lifecycle
  • Wiring example, delivery semantics (cross-link to real-time-mode.md), max_dwell_ms tuning, connection budget formula tying app-pool size + executor connections to the CU cap
  • Target table DDL
  • Performance & longevity section: dead-tuple bloat on hot-key upserts (per-table autovacuum tuning), the CDF-on-upsert-table anti-pattern (replication slot pins xmin horizon → unbounded bloat), and the reconnect-instead-of-fail pattern for transient disconnects

SKILL.md

Two new nav rows in the Core Patterns table pointing at the new references.

Manifest

manifest.json regenerated by python3 scripts/skills.py generate to include the two new reference files. Only the databricks-spark-structured-streaming skill's file list changes.

Validation

python3 scripts/skills.py validate → "Everything is up to date."

Orthogonality

  • No duplication with stream-static-joins.md (general broadcast pattern stays there; RTM file states the RTM-specific constraint only)
  • No duplication with stateful-operations.md (general watermarks/state stores stay there; my transformWithState content is RTM-specific per-row behavior)
  • No duplication with streaming-best-practices.md (general checklist stays there; my RTM file states stricter hard requirements)
  • No duplication with multi-sink-writes.md (that's foreachBatch; my file is foreach — explicitly contrasts)
  • Cross-links into skills/databricks-lakebase/references/connectivity.md, computes-and-scaling.md, and lakehouse-sync.md rather than restating their content

Fixes incorrect RTM facts in two existing references and adds two new
references covering RTM essentials and the Lakebase foreach sink — the
canonical sink for a realtime app whose serving layer is Lakebase.

Bug fixes (experimental/databricks-spark-structured-streaming/references/):

- trigger-and-cost-optimization.md: Real-Time Mode section claimed
  .trigger(realTime=True) (boolean form doesn't exist; PySpark requires
  the duration), "Photon enabled" (actually Photon must be OFF for RTM),
  "Databricks 13.3+" (current minimum is DBR 16.4 LTS), and "Latency:
  < 800ms" (current docs cite "as low as 5 ms"). Five occurrences of
  these claims across the file are corrected, including the Trigger
  Selection Guide table and the "RTM not working" diagnostic row.

- kafka-streaming.md: Patterns 4-6 used .trigger(realTime=True) (boolean)
  and Patterns 5-6 combined foreachBatch + RTM, directly contradicting
  the same file's Pattern 3 correct note that "forEachBatch is NOT
  supported in RTM". Switched those patterns to processingTime triggers
  with explanatory comments. Updated the "RTM enabled only if latency
  < 800ms required" Production Checklist row and the corresponding
  Common Issues row.

New references:

- real-time-mode.md (175 lines): cluster setup constraints
  (DBR 16.4 LTS+, Classic compute, autoscaling/Photon/spot OFF, the
  realTimeMode.enabled flag), slot math (worker vCPUs >= sum of
  partitions across stages), supported sources/sinks matrix,
  transformWithState per-row behavior (vs micro-batch per-key-per-batch),
  observability (RealTimeStreamScan physical-plan check; the three
  built-in latency metrics with the sink-write caveat; diagnose-by-metric
  decision tree), delivery semantics (exactly-once within Spark,
  at-least-once for Kafka and foreach), error class table.

- lakebase-sink-python.md (205 lines): a duck-typed Python foreach sink
  that writes streaming records into Lakebase Postgres. Cross-links to
  skills/databricks-lakebase/references/connectivity.md for general auth
  methods and computes-and-scaling.md for the CU-to-connections cap;
  adds the streaming-specific rationale for native-password (not OAuth)
  on executors. Documents the LakebaseSink class with time-based commits
  (max_dwell_ms), INSERT ... ON CONFLICT idempotent upserts, and the
  connection-budget formula tying app-pool size + executor connections
  to the CU cap.

SKILL.md: two new nav rows in the Core Patterns table.

manifest.json: regenerated by scripts/skills.py generate to include the
two new reference files.

Signed-off-by: Dan Keeling <dan.keeling@databricks.com>

@peterst28 peterst28 left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic improvement for realtime mode. The existing skills for realtime are anemic and often just plain wrong. My comments are mostly nitpicks and suggestions for future improvements.

@@ -171,12 +171,14 @@ query = (enriched_df
Enrich events with dimension data:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we recommend RTM for kafka->kafka streams? I think it's better than microbatch for this case.

parsed_df.writeStream \
.foreachBatch(route_events) \
.trigger(realTime=True) \
.foreachBatch(route_events) \ # foreachBatch is NOT supported in RTM — use processingTime

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is probably not necessary here, although it's useful for the reviewer. :)

source_df.writeStream \
.foreachBatch(validate_and_route) \
.trigger(realTime=True) \
.foreachBatch(validate_and_route) \ # foreachBatch is NOT supported in RTM — use processingTime

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Comment not necessary.

| maxOffsetsPerTrigger | 10,000-100,000 | Balance latency vs throughput |
| trigger interval | Business SLA / 3 | Recovery time buffer |
| RTM | Only if < 800ms required | Microbatch more cost-effective |
| RTM | When sub-second E2E latency is required | Micro-batch is more cost-effective for second-or-longer SLAs |

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend RTM for kafka -> kafka?

- [ ] Unique checkpoint per pipeline
- [ ] Fixed-size cluster (no autoscaling for streaming/RTM)
- [ ] RTM enabled only if latency < 800ms required
- [ ] RTM enabled only when sub-second E2E latency is required

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too strong? Good for kafka -> kafka

| Metric | What it measures |
|---|---|
| `processingLatencyMs` | Read-to-write inside the query — how long the pipeline takes to process a record |
| `sourceQueuingLatencyMs` | Source-append-time (e.g. Kafka log append) to first read by the query. Captures inter-batch time, message-bus queuing, upstream batching. |

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution: This number and e2eLatencyMs will be long if you're processing records that have been sitting in Kafka. This is an easy mistake to make, I think.


**Caveat: `e2eLatencyMs` does not currently include the sink write time.** If perceived latency is higher than `e2eLatencyMs` suggests, the gap is in the sink.

Read these metrics via a `StreamingQueryListener` or by inspecting `query.lastProgress`.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users can see it in the UI. Worth mentioning?

| High `processingLatencyMs` | Slow operator inside the query | Per-stage metrics (set `spark.databricks.streaming.execution.enableDebugMetrics = true`); CPU profile |
| High `sourceQueuingLatencyMs` | Inter-batch time too long, or upstream source latency | Inter-batch time in driver logs; Kafka `kafka.fetch.max.wait.ms` (default 500 ms — drop to 50 for low latency); upstream batching |
| `e2eLatencyMs` looks fine but the app feels slow | Sink write time (not in `e2eLatencyMs`) | Measure sink flush duration directly |
| Latency climbing over time | Memory pressure or GC growing | Executor stdout for Full GC events (long `user` CPU times); cluster restart as immediate mitigation |

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh boy. This is a complicated topic. We did a whole talk on it!

RTM preserves Structured Streaming's standard fault-tolerance guarantees:

- **Exactly-once within Spark.** Operators, state stores, and supported sinks are all exactly-once.
- **At-least-once for Kafka and `foreach` sinks.** This is the boundary case to watch — anywhere data leaves Spark via `foreach` (custom writers, side effects) or is written to Kafka, the same record may be delivered more than once on restart or task retry. Design custom sinks to be idempotent.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka is at least once? Is that documented somewhere?


## Performance & longevity

The sink performs well for moderate rates and short-lived or append-mostly workloads. The case that needs care is the one it's most often used for — **continuously upserting a small set of hot rows** to serve a live app — where dead-tuple churn can degrade it over a long run.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to discuss the upsert approach that uses Lakebase as a state store, or do you consider that bad practice?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants