Skip to content

Decouple vision agents from getstream#330

Merged
dangusev merged 22 commits intomainfrom
chore/getstream-decouple
Feb 6, 2026
Merged

Decouple vision agents from getstream#330
dangusev merged 22 commits intomainfrom
chore/getstream-decouple

Conversation

@dangusev
Copy link
Collaborator

@dangusev dangusev commented Feb 3, 2026

Goal - reduce dependency on getstream library across vision_agents (except plugins/getstream)

Summary by CodeRabbit

  • New Features

    • Added Call abstraction, unified TrackType names (VIDEO, SCREEN_SHARE) and participant-centric TTS/track handling.
  • Refactor

    • Edge/Connection moved to async, event-driven model; call creation/join flows revised and audio track model simplified (AudioStreamTrack replaces queued track).
    • GetStream SFU event artifacts relocated into the GetStream plugin and import paths updated.
  • Documentation

    • Updated protobuf generation and plugin import guidance.
  • Tests

    • Expanded tests for multi-event registration and SFU event handling.

SFU events are the implementation detail of the getstream plugin
- register required events by default
- removed unused EventEmitter base class
- update StreamEdge
@coderabbitai
Copy link

coderabbitai bot commented Feb 3, 2026

📝 Walkthrough

Walkthrough

Edge and GetStream plugin integration refactor: EdgeTransport is now a generic ABC with an EventManager and new Call protocol; core types (TrackType, Connection, Participant) and TTS/audio handling were modernized; SFU protobuf events moved into the GetStream plugin and StreamEdge maps SFU types → core types and emits core events.

Changes

Cohort / File(s) Summary
Agent core
agents-core/vision_agents/core/agents/agents.py
Switched types to EdgeTransport/Connection, modernized typing (built-in dict/list), moved call creation to edge.create_call(...), changed track/event handlers to participant-centric Participant and TrackType enum, updated TTS to accept Participant.
Edge core types & events
agents-core/vision_agents/core/edge/types.py, agents-core/vision_agents/core/edge/events.py, agents-core/vision_agents/core/edge/call.py
Added TrackType enum and Call Protocol; Connection converted to abc.ABC with lifecycle methods; removed OutputAudioTrack; event types now reference TrackType and drop legacy user fields.
EdgeTransport & EventManager
agents-core/vision_agents/core/edge/edge_transport.py, agents-core/vision_agents/core/events/manager.py
EdgeTransport became Generic[Call], owns events: EventManager, and exposes async create_call/join/create_user/publish_tracks/send_custom_event; EventManager.register now accepts variadic event classes.
GetStream plugin & SFU events
plugins/getstream/_generate_sfu_events.py, plugins/getstream/PROTOBUF_GENERATION.md, plugins/getstream/vision_agents/plugins/getstream/sfu_events.py, plugins/getstream/vision_agents/plugins/getstream/__init__.py
Moved protobuf generation and sfu_events into the GetStream plugin, updated generation paths/docs, and re-exported GetStream event models from the plugin.
StreamEdge implementation
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
StreamEdge adapted to EdgeTransport[StreamCall], added conversion helpers _to_core_track_type/_to_core_participant, implemented create_call/join/publish/demo flows, and re-emits converted core events via EventManager.
TTS / audio / utils
agents-core/vision_agents/core/tts/tts.py, agents-core/vision_agents/core/utils/audio_track.py, plugins/openai/vision_agents/plugins/openai/rtc_manager.py
TTS APIs now accept participant: Optional[Participant]; removed QueuedAudioTrack and migrated internal usage to AudioStreamTrack/PcmData; audio param renamed frameratesample_rate in places.
Events public exports
agents-core/vision_agents/core/events/__init__.py, agents-core/vision_agents/core/events/base.py
Core events package reduced exported SFU event names; core exposes ConnectionState and EventManager while SFU-specific event classes are provided by the GetStream plugin.
Tests & examples
plugins/getstream/tests/*, tests/*, plugins/*/examples/*, docs/*
Added SFU event tests and updated tests/examples/docs to new imports, enum names (e.g., TrackType.VIDEO), create_audio_track(sample_rate=...), Call test helpers, and AudioStreamTrack usage.

Sequence Diagram(s)

sequenceDiagram
  participant Agent as Agent
  participant Edge as EdgeTransport
  participant Events as EventManager
  participant Stream as GetStream

  Agent->>Edge: create_call(call_id, agent_user_id, call_type)
  Edge->>Stream: provision/get StreamCall
  Stream-->>Edge: StreamCall
  Edge->>Edge: create_audio_track(), publish_tracks(audio, video)
  Edge->>Events: register Track/Audio events
  Stream-->>Edge: raw SFU event
  Edge->>Edge: convert SFU → core Participant/TrackType
  Edge->>Events: emit TrackAddedEvent(participant, TrackType)
  Events-->>Agent: on_track_added handler invoked
  Agent->>Edge: say(text, participant)
  Edge->>Events: emit TTSAudioEvent(participant, pcm)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • Nash0x7E2
  • maxkahan

Poem

The room is glass; my voice a blade of ice,
it shears and stitches silence into shape.
A small bright edge remembers every name,
and gives it back as sound that trembles, slow—
a patient burn that learns to whisper home.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 46.49% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Decouple vision agents from getstream' clearly and accurately summarizes the main objective of this PR, which involves removing hard dependencies on getstream from the core library while keeping it isolated in the plugins/getstream module.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch chore/getstream-decouple

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/getstream/_generate_sfu_events.py (1)

47-52: ⚠️ Potential issue | 🟡 Minor

Bare except Exception: in generated code template violates guidelines.

The HEADER_LINES template includes a bare exception handler that will propagate into the generated sfu_events.py file. Consider catching a more specific exception type.

Proposed fix
 "def _to_dict(message) -> Dict[str, Any]:",
 "    try:",
 "        return MessageToDict(message, preserving_proto_field_name=True)",
-"    except Exception:",
+"    except (ValueError, TypeError):",
 "        return {}",

As per coding guidelines: Never write except Exception as e - use specific exception handling.

🤖 Fix all issues with AI agents
In `@agents-core/vision_agents/core/events/manager.py`:
- Around line 174-187: The loop that registers classes (for event_class in
event_classes) early-returns when seeing a class whose name
endswith("BaseEvent"), causing later classes to be skipped; change the behavior
to skip that one entry and continue the loop instead of returning (replace the
`return` in the branch `elif event_class.__name__.endswith("BaseEvent"):` with
`continue`) so subsequent event classes are still processed and self._events,
logger.debug, and the error/warning handling remain unchanged.

In `@tests/test_events.py`:
- Around line 47-64: Rename the test function to reflect its actual behavior:
change the function name from test_register_events_from_module_raises_name_error
to something like test_register_events_from_module_and_handler_execution so it
matches that it registers events via EventManager.register_events_from_module on
the dummy module, subscribes a handler with manager.subscribe, sends an event
with manager.send, awaits manager.wait, and asserts the handler ran; update only
the test function name to keep the assertions and setup unchanged.
- Around line 129-140: The test function name and its assertion disagree: rename
the test function test_send_unknown_event_type_raises_key_error to
test_send_unknown_event_type_raises_runtime_error to reflect the actual
behavior, or if KeyError is intended change the assertion to expect KeyError
instead; locate the test function definition
(test_send_unknown_event_type_raises_key_error) in tests/test_events.py and
update either the function name or the pytest.raises(...) call accordingly when
exercising EventManager.send with UnregisteredEvent (the dynamic dataclass) and
ignore_unknown_events=False.
🧹 Nitpick comments (2)
plugins/getstream/tests/test_sfu_events.py (1)

46-47: Consider pytest.approx for floating-point comparisons.

Using pytest.approx is more idiomatic and provides clearer error messages on failure.

Optional improvement
-        assert abs(received_audio_events[0].level - 0.85) < 0.01
+        assert received_audio_events[0].level == pytest.approx(0.85, abs=0.01)
-        assert abs(received_audio_events[0].level - 0.95) < 0.01
+        assert received_audio_events[0].level == pytest.approx(0.95, abs=0.01)

Also applies to: 62-63

tests/test_events.py (1)

109-109: Consider using nonlocal for consistency.

The test at line 81-86 uses nonlocal value for mutating a counter, while here a dict workaround is used. For consistency within the same file, consider using nonlocal here as well.

♻️ Suggested refactor
-        # Counter to ensure recursive handler is invoked
-        recursive_counter = {"count": 0}
+        # Counter to ensure recursive handler is invoked
+        recursive_count = 0

         `@manager.subscribe`
         async def failing_handler(event: ValidEvent):
             raise RuntimeError("Intentional failure")

         `@manager.subscribe`
         async def exception_handler(event: ExceptionEvent):
+            nonlocal recursive_count
             # Increment the counter each time the exception handler runs
-            recursive_counter["count"] += 1
+            recursive_count += 1
             # Re-raise the exception only once to trigger a second recursion
-            if recursive_counter["count"] == 1:
+            if recursive_count == 1:
                 raise ValueError("Re-raising in exception handler")

         manager.send(ValidEvent(field=10))
         await manager.wait()

         # After processing, the recursive counter should be 2 (original failure + one re-raise)
-        assert recursive_counter["count"] == 2
+        assert recursive_count == 2

@dangusev dangusev marked this pull request as ready for review February 5, 2026 16:12
@Nash0x7E2 Nash0x7E2 self-requested a review February 5, 2026 16:16
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
agents-core/vision_agents/core/tts/tts.py (1)

273-285: ⚠️ Potential issue | 🟡 Minor

Broad exception handling violates coding guidelines.

The except Exception as e catch-all swallows every possible error type. Consider catching more specific exceptions (e.g., TypeError, ValueError, or provider-specific errors from the TTS synthesis) to avoid masking unexpected failures.

As per coding guidelines: "Never write except Exception as e - use specific exception handling."

🤖 Fix all issues with AI agents
In `@agents-core/vision_agents/core/edge/edge_transport.py`:
- Line 5: The import of AudioStreamTrack and the concrete return type in
create_audio_track() break the generics-based decoupling; change
create_audio_track() to return the generic type T_AudioTrack instead of
getstream.video.rtc.AudioStreamTrack, remove the direct AudioStreamTrack import
from this module, and ensure any references in edge_transport.py use the
T_AudioTrack generic (and not the concrete class) so the core public API no
longer exposes getstream types; also scan for usages of AudioStreamTrack or
PcmData in this module and adapt them to the abstract/generic types or local
protocol interfaces to eliminate the direct getstream dependency.

In `@plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py`:
- Around line 377-387: The create_call method currently ignores its
agent_user_id parameter and uses self.agent_user_id, which may be None; update
create_call to use the agent_user_id argument when calling call.get_or_create
(i.e., pass data={"created_by_id": agent_user_id}) or make agent_user_id a
required parameter and validate it before calling call.get_or_create; ensure you
update any callers or add a clear check that raises if agent_user_id is missing
so call.get_or_create always receives a non-None created_by_id.

In `@tests/test_agent_tracks.py`:
- Around line 124-126: The test helper create_audio_track returns a
unittest.mock.Mock (Mock) which violates the guideline to avoid mocks; replace
it with a minimal stub object that exposes the same attribute(s) used by tests
(e.g., an object with an id attribute like "audio_track_1"). Update the
create_audio_track method to instantiate and return that lightweight stub (or a
simple dataclass/namespace) instead of Mock so behavior is explicit and tests
don't depend on unittest.mock.
🧹 Nitpick comments (6)
tests/test_agents/test_agents.py (1)

51-56: Forward sample_rate and stereo parameters through to AudioStreamTrack instead of hardcoding them.

Currently, DummyEdge.create_audio_track() accepts *args, **kwargs but silently ignores them, then hardcodes sample_rate=48000 and channels=2. This masks bugs when callers attempt to pass different values. The production StreamEdge implementation already uses the recommended pattern—accept parameterized defaults and forward them through.

♻️ Suggested adjustment
-    def create_audio_track(self, *args, **kwargs) -> AudioStreamTrack:
-        return AudioStreamTrack(
-            audio_buffer_size_ms=300_000,
-            sample_rate=48000,
-            channels=2,
-        )
+    def create_audio_track(
+        self, sample_rate: int = 48000, stereo: bool = True
+    ) -> AudioStreamTrack:
+        return AudioStreamTrack(
+            audio_buffer_size_ms=300_000,
+            sample_rate=sample_rate,
+            channels=2 if stereo else 1,
+        )
agents-core/vision_agents/core/tts/tts.py (1)

88-114: Consider alternatives to hasattr for type checking.

The hasattr calls at lines 95 and 104 check for __aiter__ and __iter__ protocols. While this duck-typing pattern is common, the coding guidelines discourage hasattr. You could use isinstance checks with typing.AsyncIterable and typing.Iterable, or use try/except with the iteration itself.

As per coding guidelines: "Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python."

♻️ Proposed alternative using isinstance
+from typing import AsyncIterable, Iterable
+
 async def _iter_pcm(self, resp: Any) -> AsyncGenerator[PcmData, None]:
     """Yield PcmData chunks from a provider response of various shapes."""
     # Single buffer or PcmData
     if isinstance(resp, (PcmData,)):
         yield resp
         return
     # Async iterable
-    if hasattr(resp, "__aiter__"):
+    if isinstance(resp, AsyncIterable):
         async for item in resp:
             if not isinstance(item, PcmData):
                 raise TypeError(
                     "stream_audio must yield PcmData; wrap provider bytes via PcmData.from_response in the plugin"
                 )
             yield item
         return
     # Sync iterable
-    if hasattr(resp, "__iter__") and not isinstance(
+    if isinstance(resp, Iterable) and not isinstance(
         resp, (bytes, bytearray, memoryview, str)
     ):
agents-core/vision_agents/core/edge/edge_transport.py (1)

119-125: Missing docstrings and return type annotation.

create_conversation lacks a docstring and return type annotation. add_track_subscriber also lacks a docstring. For an abstract base class defining a public contract, these should be documented.

📝 Proposed docstrings
 `@abc.abstractmethod`
-async def create_conversation(self, call: Call, user: User, instructions: str):
+async def create_conversation(self, call: Call, user: User, instructions: str) -> Any:
+    """Create a conversation/chat channel for the call.
+
+    Args:
+        call: The call to create conversation for.
+        user: The user creating the conversation.
+        instructions: System instructions for the conversation.
+
+    Returns:
+        A conversation object for message exchange.
+    """
     pass

 `@abc.abstractmethod`
 def add_track_subscriber(self, track_id: str) -> Optional[aiortc.VideoStreamTrack]:
+    """Subscribe to a remote video track.
+
+    Args:
+        track_id: The ID of the track to subscribe to.
+
+    Returns:
+        The subscribed video track, or None if subscription failed.
+    """
     pass
agents-core/vision_agents/core/agents/agents.py (2)

718-729: Dynamic attribute access via getattr.

The _apply method uses getattr twice (lines 720, 722) for dynamic method invocation. While this is a common pattern for plugin systems, the coding guidelines discourage getattr. Consider a registry-based approach or explicit method dispatch if this becomes problematic.

Additionally, the except Exception as e at line 726 is overly broad.

As per coding guidelines: "Avoid using getattr, hasattr, delattr and setattr" and "Never write except Exception as e."


19-19: Clarify architectural requirements for external type abstraction.

The getstream imports at line 19 are part of a systemic pattern: PcmData is used across 9 core modules (vad, tts, stt, llm, processors, edge, utils) and AudioStreamTrack appears in both agents.py and edge_transport.py. These are third-party library types fundamental to audio processing and RTC infrastructure. Before recommending abstraction, establish whether decoupling from external library types is a stated architectural priority and whether implementation swapping is planned. If it is, create wrapper abstractions; otherwise, this pattern reflects appropriate use of external dependencies.

plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)

63-82: getattr usage for user_id access.

Line 78 uses getattr(participant, "user_id", None) instead of direct attribute access. Since both sfu_events.Participant and StreamParticipant should have user_id attributes, consider direct access with appropriate type narrowing.

As per coding guidelines: "Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python."

♻️ Proposed fix
 def _to_core_participant(
     participant: sfu_events.Participant | StreamParticipant | None,
 ) -> Participant | None:
     if participant is None:
         return None

-    user_id = getattr(participant, "user_id", None)
+    user_id = participant.user_id if hasattr(participant, "user_id") else None
+    # Or with direct access if both types are guaranteed to have user_id:
+    # user_id = participant.user_id
     if not user_id:
         return None

     return Participant(original=participant, user_id=user_id)

Note: If both input types are guaranteed to have user_id, direct access participant.user_id would be preferred. The hasattr alternative still violates guidelines but is more explicit about the optional nature.

This will make creating calls more flexible, while we are still tied to `aiortc.MediaStreamTrack`, making previous generics less useful.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
tests/test_agents/test_agents.py (1)

43-44: ⚠️ Potential issue | 🟡 Minor

self.events overwritten immediately after parent registers required events.

Line 43 calls super().__init__(), which (per the EdgeTransport base class) creates an EventManager and registers the required events (AudioReceivedEvent, TrackAddedEvent, TrackRemovedEvent, CallEndedEvent). Line 44 then replaces it with a bare EventManager(), discarding those registrations. If any test path or the Agent internals checks for registered events, this will silently break.

Suggested fix: remove the redundant reassignment
     def __init__(
         self,
         exc_on_join: Optional[Exception] = None,
         exc_on_publish_tracks: Optional[Exception] = None,
     ):
         super(DummyEdge, self).__init__()
-        self.events = EventManager()
         self.exc_on_join = exc_on_join
         self.exc_on_publish_tracks = exc_on_publish_tracks
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (4)

117-128: 🛠️ Refactor suggestion | 🟠 Major

Replace broad except Exception as e with specific exception types.

Lines 120–126 already handle asyncio.TimeoutError and RuntimeError. The catch-all on line 127 violates the project guideline. If you need to handle additional known failure modes from the RTC stack, catch those specifically (e.g., OSError, ConnectionError).

Suggested fix
         except RuntimeError as e:
             if "asynchronous generator" in str(e):
                 logger.debug(f"Ignoring async generator error during shutdown: {e}")
             else:
                 raise
-        except Exception as e:
-            logger.error(f"Error during connection close: {e}")
+        except OSError as e:
+            logger.error(f"Error during connection close: {e}")

As per coding guidelines: "Never write except Exception as e - use specific exception handling."


313-323: 🛠️ Refactor suggestion | 🟠 Major

Replace hasattr with isinstance check to distinguish event types.

The event is already typed as a union of TrackUnpublishedEvent | ParticipantLeftEvent. Using isinstance is both type-safe and avoids the hasattr guideline violation. Also, event.payload is not None on line 314 is redundant since line 302 already guards against that.

Suggested fix
         # Determine which tracks to remove
-        if hasattr(event.payload, "type") and event.payload is not None:
+        if isinstance(event, sfu_events.TrackUnpublishedEvent):
             # TrackUnpublishedEvent - single track
             tracks_to_remove = [event.payload.type]
             event_desc = "Track unpublished"

As per coding guidelines: "Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python."


589-595: 🛠️ Refactor suggestion | 🟠 Major

Broad except Exception as e when opening browser.

webbrowser.open typically raises webbrowser.Error or OSError. Catch those specifically.

Suggested fix
         try:
             # Run webbrowser.open in a separate thread to avoid blocking the event loop
             await asyncio.to_thread(webbrowser.open, url)
             logger.info("✅ Browser opened successfully!")
-        except Exception as e:
+        except (webbrowser.Error, OSError) as e:
             logger.error(f"❌ Failed to open browser: {e}")
             logger.warning(f"Please manually open this URL: {url}")

As per coding guidelines: "Never write except Exception as e - use specific exception handling."


500-514: ⚠️ Potential issue | 🟡 Minor

send_custom_event — clean guard, but self.agent_user_id may be None.

Line 514 passes self.agent_user_id which could be None if create_user was never called. This is the same root cause as the create_call issue. Consider adding a guard or making agent_user_id required at construction time.

🤖 Fix all issues with AI agents
In `@plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py`:
- Around line 75-82: Replace the getattr call with direct attribute access since
sfu_events.Participant/StreamParticipant always define user_id: after the
existing None guard for participant, read user_id = participant.user_id (instead
of getattr) and keep the subsequent None check and return
Participant(original=participant, user_id=user_id); this removes the getattr
usage while preserving the existing None-handling logic around participant and
user_id.
- Around line 497-498: The close() method currently only nulls self._call which
can leak resources; update close() to fully tear down the stream connection by
(1) calling or awaiting the underlying StreamConnection.close() /
ConnectionManager disconnect routine on self._real_connection (or invoking its
cleanup/shutdown API) to stop network activity, (2) stopping and removing any
MediaStreamTracks and clearing self._track_map and self._pending_tracks, (3)
cancelling/awaiting any pending tasks related to conversation and clearing
self.conversation, and finally setting self._call and self._real_connection to
None; reference the methods/attributes close, _real_connection, _track_map,
_pending_tracks, conversation and ensure any async shutdown is awaited to
prevent leaks.
🧹 Nitpick comments (3)
tests/test_agents/test_agents.py (1)

56-61: create_audio_track — minor signature deviation from abstract, but acceptable for a test double.

The abstract method declares no positional/keyword args beyond self, but this override accepts *args, **kwargs. Functionally harmless, though aligning signatures makes intent clearer.

agents-core/vision_agents/core/edge/edge_transport.py (1)

133-148: create_conversation uses Call protocol — minor docstring gap; send_custom_event is clean.

create_conversation (line 134) and add_track_subscriber (line 138) lack docstrings, unlike every other abstract method in this class. Consider adding them for consistency.

plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)

273-287: Dead guard: if not is_agent_track is always True here.

Agent tracks already trigger an early return on line 222–226, so by line 278, is_agent_track is guaranteed False. The condition can be removed to reduce cognitive noise.

Suggested simplification
         if track_id:
             # Store with correct type from SFU
             self._track_map[track_key] = {"track_id": track_id, "published": True}
 
-            # Only emit TrackAddedEvent for remote participants, not for agent's own tracks
-            if not is_agent_track:
-                # NOW spawn TrackAddedEvent with correct type
-                self.events.send(
-                    events.TrackAddedEvent(
-                        plugin_name="getstream",
-                        track_id=track_id,
-                        track_type=track_type,
-                        participant=_to_core_participant(event.participant),
-                    )
-                )
+            # NOW spawn TrackAddedEvent with correct type
+            self.events.send(
+                events.TrackAddedEvent(
+                    plugin_name="getstream",
+                    track_id=track_id,
+                    track_type=track_type,
+                    participant=_to_core_participant(event.participant),
+                )
+            )

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py`:
- Around line 55-60: The helper _to_core_track_type currently raises ValueError
for unknown StreamTrackType which can crash async handlers; change
_to_core_track_type (and its use of _TRACK_TYPE_MAP/StreamTrackType) to return
None (or an UNSPECIFIED sentinel) and log a warning when the input is
unrecognized instead of raising; then update callers _on_track_published and
_on_track_removed to guard the return value (e.g., track_type =
_to_core_track_type(...); if track_type is None: return) so unknown track types
are skipped and do not mutate _track_map or _pending_tracks.
🧹 Nitpick comments (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)

297-322: Use isinstance instead of hasattr for event type branching.

Line 313 uses hasattr(event.payload, "type") to distinguish TrackUnpublishedEvent from ParticipantLeftEvent. Since the type union is already declared on line 298, isinstance is both safer and compliant with the project's coding guidelines.

Proposed fix
-        if hasattr(event.payload, "type") and event.payload is not None:
+        if isinstance(event, sfu_events.TrackUnpublishedEvent) and event.payload is not None:
             # TrackUnpublishedEvent - single track
             tracks_to_remove = [event.payload.type]
             event_desc = "Track unpublished"

As per coding guidelines: "Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python."

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/getstream/vision_agents/plugins/getstream/sfu_events.py (1)

1486-1507: ⚠️ Potential issue | 🟡 Minor

The participant property/setter pattern silently discards writes.

In the five event classes that have a participant property (e.g., ParticipantJoinedEvent, ParticipantLeftEvent, ParticipantUpdatedEvent, TrackPublishedEvent, TrackUnpublishedEvent), the setter stores to _participant but the getter never reads it — it always derives from self.payload. This means:

event.participant = some_participant  # stored in _participant, silently ignored
event.participant  # returns from payload, not from what was just set

The comment says "to satisfy dataclass __init__," which is fair, but any caller who writes to participant after construction gets a silent no-op. Consider at minimum logging a warning in the setter, or documenting this read-only-from-payload behavior more prominently.

Also applies to: 1535-1556, 1615-1636, 2090-2111, 2139-2167

🧹 Nitpick comments (2)
plugins/getstream/vision_agents/plugins/getstream/sfu_events.py (2)

19-23: Bare except Exception in _to_dict — prefer a specific exception type.

The _to_dict helper swallows every Exception, silently returning {}. Protobuf's MessageToDict raises specific errors (e.g., AttributeError, ValueError). Catching broadly here can mask real bugs — a corrupt payload, a wrong type passed in — and you'd never know. The empty dict return is indistinguishable from a valid-but-empty message.

I understand this is auto-generated, so the fix belongs in the generator, but worth noting.

Suggested narrower catch
 def _to_dict(message) -> Dict[str, Any]:
     try:
         return MessageToDict(message, preserving_proto_field_names=True)
-    except Exception:
+    except (AttributeError, ValueError):
         return {}

As per coding guidelines, "Never write except Exception as e - use specific exception handling".


520-526: Pervasive getattr usage throughout all event classes.

Every __getattr__ fallback and every property accessor uses getattr(self.payload, ...) rather than direct attribute access. The coding guidelines say to avoid getattr; prefer self.payload.field_name with proper type narrowing. Since the protobuf message types do have these attributes as concrete fields, direct access would be both safer and more readable.

Again — auto-generated, so this is a generator-level concern. But for a file this large (2360 lines), the pattern compounds: a typo in a field name string literal silently returns None instead of raising.

As per coding guidelines, "Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python".

Also applies to: 554-560, 603-609, 637-643, 679-685, 725-731, 756-762, 797-803, 835-841, 880-886, 918-924, 960-966, 994-1000, 1021-1027, 1056-1062, 1090-1096, 1131-1137, 1169-1175, 1217-1223, 1325-1331, 1386-1392, 1420-1426, 1471-1477, 1520-1526, 1569-1575, 1600-1606, 1649-1655, 1684-1690, 1718-1724, 1797-1803, 1986-1992, 2034-2040, 2075-2081, 2124-2130, 2180-2186, 2257-2263, 2313-2319

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (4)

124-128: ⚠️ Potential issue | 🟡 Minor

Avoid bare except Exception as e — use specific exception types.

The coding guidelines prohibit except Exception as e. The RuntimeError case is already handled above; consider catching specific expected exceptions (e.g., ConnectionError, OSError) for the remaining case, or at minimum re-raising unexpected ones.

Suggested fix
         except RuntimeError as e:
             if "asynchronous generator" in str(e):
                 logger.debug(f"Ignoring async generator error during shutdown: {e}")
             else:
                 raise
-        except Exception as e:
-            logger.error(f"Error during connection close: {e}")
+        except OSError as e:
+            logger.error(f"Network error during connection close: {e}")

As per coding guidelines: "Never write except Exception as e - use specific exception handling."


585-589: ⚠️ Potential issue | 🟡 Minor

Another bare except Exception as e — use specific exception types.

Same guideline violation. webbrowser.open typically raises OSError or browser-specific errors.

Suggested fix
-        except Exception as e:
-            logger.error(f"❌ Failed to open browser: {e}")
+        except OSError as e:
+            logger.error(f"❌ Failed to open browser: {e}")

As per coding guidelines: "Never write except Exception as e - use specific exception handling."


311-316: ⚠️ Potential issue | 🟡 Minor

Avoid hasattr — use structural type discrimination instead.

The coding guidelines prohibit hasattr. Since _on_track_removed already knows the two concrete event types (TrackUnpublishedEvent and ParticipantLeftEvent), use isinstance to discriminate:

Suggested fix
-        if hasattr(event.payload, "type") and event.payload is not None:
-            # TrackUnpublishedEvent - single track
-            tracks_to_remove = [event.payload.type]
-            event_desc = "Track unpublished"
+        if isinstance(event, sfu_events.TrackUnpublishedEvent):
+            tracks_to_remove = [event.payload.type]
+            event_desc = "Track unpublished"
         else:
             # ParticipantLeftEvent - all published tracks

As per coding guidelines: "Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python."


383-449: ⚠️ Potential issue | 🟡 Minor

Address context manager cleanup and fix coding guideline violations.

The concern about __aenter__() at line 441 is partially valid: while the code manually enters the context manager without explicitly calling __aexit__(), proper cleanup exists through StreamConnection.close()connection.leave(). However, mixing explicit context manager entry with manual lifecycle management is unconventional. Ensure the cleanup contract is clearly documented.

Additionally, fix two coding guideline violations:

  • Line 126: Replace except Exception as e with specific exception types per guidelines (currently catches too broadly).
  • Line 313: Replace hasattr(event.payload, "type") with direct attribute access per guidelines.
🧹 Nitpick comments (2)
agents-core/vision_agents/core/edge/edge_transport.py (1)

130-136: create_conversation uses Call instead of T_Call, and both methods here lack docstrings.

join and create_call use the generic T_Call, but create_conversation on Line 131 accepts the bare Call protocol. This inconsistency means a subclass parameterized with a specific call type (e.g., StreamCall) won't get type-checked correctly at this boundary. Also, create_conversation and add_track_subscriber are the only abstract methods without docstrings.

Suggested fix
     `@abc.abstractmethod`
-    async def create_conversation(self, call: Call, user: User, instructions: str):
+    async def create_conversation(self, call: T_Call, user: User, instructions: str):
+        """Create a conversation channel associated with the call.
+
+        Args:
+            call: The call to associate the conversation with.
+            user: The user creating the conversation.
+            instructions: System instructions for the conversation.
+        """
         pass
 
     `@abc.abstractmethod`
     def add_track_subscriber(self, track_id: str) -> Optional[aiortc.VideoStreamTrack]:
+        """Subscribe to a video track by ID.
+
+        Args:
+            track_id: The ID of the track to subscribe to.
+
+        Returns:
+            The video stream track if available, or None.
+        """
         pass
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)

275-286: Dead guard: if not is_agent_track is always True here.

The early return on line 221-225 already exits when is_agent_track is True, so by the time execution reaches line 277, is_agent_track is guaranteed False. This condition is dead code — it reads as meaningful but never gates anything.

Suggested fix
         if track_id:
             # Store with correct type from SFU
             self._track_map[track_key] = {"track_id": track_id, "published": True}
 
-            # Only emit TrackAddedEvent for remote participants, not for agent's own tracks
-            if not is_agent_track:
-                # NOW spawn TrackAddedEvent with correct type
-                self.events.send(
-                    events.TrackAddedEvent(
-                        plugin_name="getstream",
-                        track_id=track_id,
-                        track_type=track_type,
-                        participant=_to_core_participant(event.participant),
-                    )
-                )
+            # Emit TrackAddedEvent with correct type from SFU
+            self.events.send(
+                events.TrackAddedEvent(
+                    plugin_name="getstream",
+                    track_id=track_id,
+                    track_type=track_type,
+                    participant=_to_core_participant(event.participant),
+                )
+            )

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
agents-core/vision_agents/core/agents/agents.py (1)

992-995: 🛠️ Refactor suggestion | 🟠 Major

getattr on Participant — prefer direct attribute access.

participant is typed as Participant (a dataclass with a user_id field). The defensive getattr(participant, "user_id", None) is unnecessary and violates the coding guidelines. Use direct access instead.

♻️ Proposed fix
                    if (
                        participant
-                        and getattr(participant, "user_id", None) != self.agent_user.id
+                        and participant.user_id != self.agent_user.id
                    ):

As per coding guidelines: "Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python".

🧹 Nitpick comments (3)
agents-core/vision_agents/core/agents/agents.py (3)

718-729: getattr and bare except Exception violate coding guidelines.

Lines 720/722 use getattr for dynamic dispatch, and Line 726 catches the broad Exception. The coding guidelines explicitly prohibit both patterns. Consider defining a Lifecycle protocol/ABC with explicit start/stop/close methods that subclasses implement, replacing the string-based dispatch. If the dynamic approach must stay, at minimum narrow the exception type.

♻️ Sketch: protocol-based approach
-    async def _apply(self, function_name: str, *args, **kwargs):
-        subclasses = [...]
-        for subclass in subclasses:
-            if (
-                subclass is not None
-                and getattr(subclass, function_name, None) is not None
-            ):
-                func = getattr(subclass, function_name)
-                if func is not None:
-                    try:
-                        await await_or_run(func, *args, **kwargs)
-                    except Exception as e:
-                        self.logger.exception(...)
+    async def _apply(self, function_name: str, *args, **kwargs):
+        subclasses = [...]
+        for subclass in subclasses:
+            if subclass is None:
+                continue
+            func = getattr(subclass, function_name, None)  # dynamic dispatch - unavoidable here
+            if func is not None:
+                try:
+                    await await_or_run(func, *args, **kwargs)
+                except (RuntimeError, ConnectionError, OSError) as e:
+                    self.logger.exception(
+                        f"Error calling {function_name} on {subclass.__class__.__name__}: {e}"
+                    )

As per coding guidelines: "Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python" and "Never write except Exception as e - use specific exception handling".


919-931: Broad except Exception pattern persists across multiple handlers.

This handler (and several others at Lines 801, 1028, 1044, 1061) catch bare Exception. The coding guidelines require specific exception types. While these are largely pre-existing, the _on_agent_say handler in particular could mask TTS-specific errors that deserve different handling (e.g., connection errors vs. encoding errors).

As per coding guidelines: "Never write except Exception as e - use specific exception handling".


1421-1429: Inconsistent use of TrackType.VIDEO.value vs. TrackType enum elsewhere.

Line 1423 passes TrackType.VIDEO.value (an int) for the type field, while _on_track_added at Line 1158-1160 passes the TrackType enum directly. Since TrackType is an IntEnum this won't break equality checks, but the inconsistency is confusing. Prefer the enum form for consistency.

♻️ Proposed fix
-                type=TrackType.VIDEO.value,
+                type=TrackType.VIDEO,

@dangusev dangusev merged commit e919787 into main Feb 6, 2026
10 checks passed
@dangusev dangusev deleted the chore/getstream-decouple branch February 6, 2026 12:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants