Skip to content

kafka integration#406

Draft
emranemran wants to merge 8 commits intobackend-fal-v6from
kafka2
Draft

kafka integration#406
emranemran wants to merge 8 commits intobackend-fal-v6from
kafka2

Conversation

@emranemran
Copy link

@emranemran emranemran commented Feb 6, 2026

Summary

  • Add connection_id parameter to the Kafka publisher API so all server-side events can be correlated
    with the fal.ai WebSocket connection that triggered them
  • Thread connection_id from the WebRTC offer through Session, tracks, FrameProcessor,
    PipelineProcessor, and PipelineManager
  • Remove the duplicate KafkaPublisher class from fal_app.py and replace it with a lazy import of
    the server's standardized publisher

Problem

  • fal_app.py had its own KafkaPublisher class with a different API (publish(event_type, data_dict)) and a different default topic ("network_events")
  • Server-side Kafka events had no connection_id, making it impossible to correlate them with the fal
    WebSocket connection
  • Two publishers meant two different envelope formats for the same system

Changes

Commit 1: Add connection_id to kafka publisher API and schemas

  • connection_id: str | None = None added to all 4 publish methods in kafka_publisher.py
  • connection_id field added to WebRTCOfferRequest and PipelineLoadRequest schemas

Commit 2: Thread connection_id through server components (8 files)

  • webrtc.pytracks.py / cloud_track.pyframe_processor.pypipeline_processor.py
  • pipeline_manager.py: pipeline load/unload/error events include connection_id when triggered from
    cloud mode
  • fal_app.py: injects connection_id into offer and pipeline load requests to the scope server

Commit 3: Remove duplicate KafkaPublisher from fal_app.py

  • Deleted ~100 lines of duplicate publisher class
  • Lazy imports server's KafkaPublisher; all events now use KAFKA_TOPIC env var (default
    "scope-events")

Test plan

  • uv run daydream-scope starts without errors
  • With KAFKA_BOOTSTRAP_SERVERS set, verify events include connection_id when available
  • Verify fal-side events (websocket_connected, websocket_disconnected) use the standardized
    envelope

Add connection_id parameter to all 4 kafka publish methods
(publish_async, publish, publish_event, publish_event_async) and to
WebRTCOfferRequest and PipelineLoadRequest schemas. This enables
correlating server-side Kafka events with fal.ai WebSocket connections.
All parameters default to None so existing callers work unchanged.

Signed-off-by: emranemran <emran.mah@gmail.com>
Pass connection_id from WebRTC offer request through Session, tracks,
FrameProcessor, PipelineProcessor, and PipelineManager so all Kafka
events include the fal.ai WebSocket connection ID for correlation.

Also inject connection_id into offer and pipeline load requests from
fal_app.py to the scope server.

Signed-off-by: emranemran <emran.mah@gmail.com>
Replace the inline KafkaPublisher class (which had a different API and
defaulted to topic "network_events") with a lazy import of the server's
standardized KafkaPublisher. Update websocket_connected and
websocket_disconnected publish calls to use the standardized
publish_async() API with named parameters. All events now go to the
KAFKA_TOPIC env var (default "scope-events") and use the same envelope
structure.

Signed-off-by: emranemran <emran.mah@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants