Draft
Conversation
Add connection_id parameter to all 4 kafka publish methods (publish_async, publish, publish_event, publish_event_async) and to WebRTCOfferRequest and PipelineLoadRequest schemas. This enables correlating server-side Kafka events with fal.ai WebSocket connections. All parameters default to None so existing callers work unchanged. Signed-off-by: emranemran <emran.mah@gmail.com>
Pass connection_id from WebRTC offer request through Session, tracks, FrameProcessor, PipelineProcessor, and PipelineManager so all Kafka events include the fal.ai WebSocket connection ID for correlation. Also inject connection_id into offer and pipeline load requests from fal_app.py to the scope server. Signed-off-by: emranemran <emran.mah@gmail.com>
Replace the inline KafkaPublisher class (which had a different API and defaulted to topic "network_events") with a lazy import of the server's standardized KafkaPublisher. Update websocket_connected and websocket_disconnected publish calls to use the standardized publish_async() API with named parameters. All events now go to the KAFKA_TOPIC env var (default "scope-events") and use the same envelope structure. Signed-off-by: emranemran <emran.mah@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
connection_idparameter to the Kafka publisher API so all server-side events can be correlatedwith the fal.ai WebSocket connection that triggered them
connection_idfrom the WebRTC offer through Session, tracks, FrameProcessor,PipelineProcessor, and PipelineManager
KafkaPublisherclass fromfal_app.pyand replace it with a lazy import ofthe server's standardized publisher
Problem
fal_app.pyhad its ownKafkaPublisherclass with a different API (publish(event_type, data_dict)) and a different default topic ("network_events")connection_id, making it impossible to correlate them with the falWebSocket connection
Changes
Commit 1: Add
connection_idto kafka publisher API and schemasconnection_id: str | None = Noneadded to all 4 publish methods inkafka_publisher.pyconnection_idfield added toWebRTCOfferRequestandPipelineLoadRequestschemasCommit 2: Thread
connection_idthrough server components (8 files)webrtc.py→tracks.py/cloud_track.py→frame_processor.py→pipeline_processor.pypipeline_manager.py: pipeline load/unload/error events includeconnection_idwhen triggered fromcloud mode
fal_app.py: injectsconnection_idinto offer and pipeline load requests to the scope serverCommit 3: Remove duplicate KafkaPublisher from
fal_app.pyKafkaPublisher; all events now useKAFKA_TOPICenv var (default"scope-events")Test plan
uv run daydream-scopestarts without errorsKAFKA_BOOTSTRAP_SERVERSset, verify events includeconnection_idwhen availablewebsocket_connected,websocket_disconnected) use the standardizedenvelope