feat(voice): Add v2 spike handler for 45-min behavioral interviews#58
Conversation
Design for handler_v2_spike.py addressing all gaps from the voice handler gap report: session resumption, context compression, GoAway handling, wall clock timer, five termination reasons, and transcript suppression. Validated through two rounds of spec review. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add transcript_buffer field to VoiceSessionState for evaluation - Heartbeat ConnectionError now calls _handle_termination(DISCONNECTED) - Flag session state warning approach as "likely won't work" - Use RPS_ prefix on sentinels to avoid false positives - Guard WebSocket send in _handle_termination with try/except - Compression config gracefully skipped if SDK lacks support Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
11-task TDD plan covering config, dataclasses, event classifier, termination handler, timer, heartbeat, event router, client receiver, orchestrator, entry point, and final verification. Validated through plan review with fixes for imports, stats counter, and spec deviation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…persistence Implements _send_to_client routing audio/transcript/turn_status/session_resumption/go_away/unknown ADK events; transcripts are logged but not forwarded to frontend (REQ-7). Also fixes _make_state factory to use AsyncMock for chat_logger so awaitable calls work correctly in all tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements _receive_from_client for VoiceHandlerV2: routes audio/pcm blobs to send_realtime, text/plain to send_content with logging, end_session to USER_ENDED termination, and WebSocketDisconnect to DISCONNECTED termination. Invalid JSON increments error stats. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…full test suite Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… handling - Extract _Blob/_Content/_Part stubs to module level (was duplicated 3x) - Fix `except (ConnectionError, Exception)` → split into proper blocks - Use setdefault+increment for stats instead of .get() fallback - Share _make_event() test helper at module level across test classes - Simplify TestSendToClient mock setup using shared helper Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a new, experimental voice handler (V2 spike) specifically designed for 45-minute behavioral interviews. The primary goal is to address several critical limitations identified in the previous voice handler, focusing on improving session stability, resilience, and intelligent interaction with the underlying ADK Live API. Key improvements include advanced session state management, robust error handling, and a modular, concurrent architecture that allows for better control over the streaming lifecycle. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a prototype v2 voice handler, which is a significant feature addition. The implementation is well-structured, particularly with the use of a typed session state dataclass and the separation of concerns in event processing. The asynchronous task orchestration is robust. My review focuses on improving type safety, consistency with existing project utilities, and ensuring test accuracy. I've identified a few areas for improvement, including leveraging Pydantic models for request validation, standardizing imports and utility function usage, and refining a test mock to better reflect production behavior.
| async def _receive_from_client( | ||
| self, | ||
| websocket, | ||
| state: VoiceSessionState, | ||
| env_info, | ||
| ) -> None: | ||
| """Receive from client and forward to ADK.""" | ||
| try: | ||
| while not state.stop_event.is_set(): | ||
| data = await websocket.receive_text() | ||
|
|
||
| try: | ||
| request = json.loads(data) | ||
| except (json.JSONDecodeError, ValueError) as e: | ||
| logger.warning(f"Invalid JSON from client: {e}") | ||
| state.stats.setdefault("errors", 0) | ||
| state.stats["errors"] += 1 | ||
| continue | ||
|
|
||
| mime_type = request.get("mime_type", "") | ||
| raw_data = request.get("data", "") | ||
| end_session = request.get("end_session", False) | ||
|
|
||
| if end_session: | ||
| await self._handle_termination( | ||
| websocket, state, VoiceConfigV2.REASON_USER_ENDED, | ||
| ) | ||
| return | ||
|
|
||
| try: | ||
| decoded = base64.b64decode(raw_data) | ||
| except Exception: | ||
| state.stats.setdefault("errors", 0) | ||
| state.stats["errors"] += 1 | ||
| continue | ||
|
|
||
| try: | ||
| if mime_type == "audio/pcm": | ||
| # PCM logging in non-production | ||
| if not getattr(env_info, "is_production", True): | ||
| try: | ||
| await state.chat_logger.log_pcm_audio( | ||
| user_id=state.user_id, | ||
| session_id=state.session_id, | ||
| audio_data=decoded, | ||
| ) | ||
| except (AttributeError, Exception) as e: | ||
| logger.debug(f"PCM logging skipped: {e}") | ||
|
|
||
| blob = _Blob(mime_type=mime_type, data=decoded) | ||
| state.live_request_queue.send_realtime(blob) | ||
| state.stats.setdefault("audio_chunks_sent", 0) | ||
| state.stats["audio_chunks_sent"] += 1 | ||
|
|
||
| elif mime_type == "text/plain": | ||
| text_data = decoded.decode("utf-8") | ||
| await state.chat_logger.log_message( | ||
| user_id=state.user_id, | ||
| session_id=state.session_id, | ||
| role="user", | ||
| content=text_data, | ||
| message_number=-1, | ||
| ) | ||
| content = _Content(parts=[_Part(text=text_data)]) | ||
| state.live_request_queue.send_content(content) | ||
| state.stats.setdefault("text_chunks_sent", 0) | ||
| state.stats["text_chunks_sent"] += 1 | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error processing client input: {e}") | ||
| state.stats.setdefault("errors", 0) | ||
| state.stats["errors"] += 1 | ||
|
|
||
| except WebSocketDisconnect: | ||
| logger.info(f"Client disconnected from session {state.session_id}") | ||
| await self._handle_termination( | ||
| websocket, state, VoiceConfigV2.REASON_DISCONNECTED, | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Error in _receive_from_client: {e}") | ||
| await self._handle_termination( | ||
| websocket, state, VoiceConfigV2.REASON_DISCONNECTED, | ||
| ) | ||
|
|
There was a problem hiding this comment.
The current implementation manually parses the incoming JSON and uses dict.get() to extract fields. This bypasses the Pydantic VoiceRequest model which is designed for this purpose and includes validation for mime_type and data decoding. Using the Pydantic model would make the code safer, more readable, and consistent with the project's standards. Please also move the from .models import VoiceRequest import to the top of the file.
async def _receive_from_client(
self,
websocket,
state: VoiceSessionState,
env_info,
) -> None:
"""Receive from client and forward to ADK."""
try:
while not state.stop_event.is_set():
data = await websocket.receive_text()
try:
from .models import VoiceRequest
request = VoiceRequest.model_validate_json(data)
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Invalid JSON from client: {e}")
state.stats.setdefault("errors", 0)
state.stats["errors"] += 1
continue
if request.end_session:
await self._handle_termination(
websocket, state, VoiceConfigV2.REASON_USER_ENDED,
)
return
try:
decoded = request.decode_data()
except ValueError as e:
logger.warning(f"Data decoding error: {e}")
state.stats.setdefault("errors", 0)
state.stats["errors"] += 1
continue
try:
if request.mime_type == "audio/pcm":
# PCM logging in non-production
if not getattr(env_info, "is_production", True):
try:
await state.chat_logger.log_pcm_audio(
user_id=state.user_id,
session_id=state.session_id,
audio_data=decoded,
)
except (AttributeError, Exception) as e:
logger.debug(f"PCM logging skipped: {e}")
blob = _Blob(mime_type=request.mime_type, data=decoded)
state.live_request_queue.send_realtime(blob)
state.stats.setdefault("audio_chunks_sent", 0)
state.stats["audio_chunks_sent"] += 1
elif request.mime_type == "text/plain":
text_data = decoded
await state.chat_logger.log_message(
user_id=state.user_id,
session_id=state.session_id,
role="user",
content=text_data,
message_number=-1,
)
content = _Content(parts=[_Part(text=text_data)])
state.live_request_queue.send_content(content)
state.stats.setdefault("text_chunks_sent", 0)
state.stats["text_chunks_sent"] += 1
except Exception as e:
logger.error(f"Error processing client input: {e}")
state.stats.setdefault("errors", 0)
state.stats["errors"] += 1
except WebSocketDisconnect:
logger.info(f"Client disconnected from session {state.session_id}")
await self._handle_termination(
websocket, state, VoiceConfigV2.REASON_DISCONNECTED,
)
except Exception as e:
logger.error(f"Error in _receive_from_client: {e}")
await self._handle_termination(
websocket, state, VoiceConfigV2.REASON_DISCONNECTED,
)| "type": "session_ended", | ||
| "reason": reason, | ||
| "detail": detail, | ||
| "timestamp": datetime.now(timezone.utc).isoformat(), |
There was a problem hiding this comment.
For consistency with the existing codebase and to centralize time-related logic, please use the utc_now_isoformat utility function from role_play.common.time_utils instead of datetime.now(timezone.utc).isoformat(). This applies to several places in this file (e.g., lines 325, 360, 402, 658, 678). Please also add the import from ..common.time_utils import utc_now_isoformat at the top of the file.
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "timestamp": utc_now_isoformat(), |
| try: | ||
| from ..common.exceptions import TokenExpiredError, AuthenticationError | ||
| from ..server.dependencies import get_auth_manager | ||
| auth_manager = get_auth_manager(storage) | ||
| token_data = auth_manager.verify_token(token) | ||
| user = await storage.get_user(token_data.user_id) | ||
| return user | ||
| except Exception as e: | ||
| logger.error(f"JWT validation error: {e}") | ||
| return None |
There was a problem hiding this comment.
| async def fast_receive(*a, **kw): | ||
| state.stop_event.set() | ||
| state.termination_reason = "USER_ENDED" |
There was a problem hiding this comment.
This mock for fast_receive manually sets stop_event and termination_reason. To make the test more accurately reflect the production logic, it would be better to have the mock call handler._handle_termination, which is the designated single point for session termination.
| async def fast_receive(*a, **kw): | |
| state.stop_event.set() | |
| state.termination_reason = "USER_ENDED" | |
| async def fast_receive(*a, **kw): | |
| await handler._handle_termination(ws, state, "USER_ENDED") |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 542d417a15
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| handle_key = VoiceConfigV2.HANDLE_KEY_TEMPLATE.format( | ||
| user_id=state.user_id, session_id=state.session_id, | ||
| ) | ||
| await self._storage.write(handle_key, classification.data) |
There was a problem hiding this comment.
Implement the read side of session resumption
This only persists the Gemini resumption handle; I couldn't find any corresponding read/use path in handler_v2_spike.py (repo-wide search for HANDLE_KEY_TEMPLATE only returns this write and the constant). If a websocket drops mid-interview, the reconnect path has no way to pass the saved handle back into ADK, so the user starts a fresh live session instead of resuming the existing one.
Useful? React with 👍 / 👎.
| await asyncio.wait_for( | ||
| state.stop_event.wait(), | ||
| timeout=state.warning_seconds, |
There was a problem hiding this comment.
Enforce the timeout once the session is already past warning
If elapsed >= session_timeout - warning_seconds—for example with any custom timeout shorter than 5 minutes, or after reconnecting to a session that's already near/over the 45-minute cap—phase 1 times out immediately and this branch still waits the full warning_seconds. That lets expired sessions continue for up to another 300 seconds before TIME_LIMIT fires, which breaks the advertised absolute wall-clock limit.
Useful? React with 👍 / 👎.
| # Note: _initialize_adk would be called here in production | ||
| # For the spike tests, we test the auth flow only | ||
| # state = await self._initialize_adk(...) | ||
| # await self._handle_streaming(websocket, state, env_info) |
There was a problem hiding this comment.
Start ADK streaming after successful authentication
After auth succeeds, handle_voice_session() sends the initial connecting status and then returns because the ADK initialization and _handle_streaming() call are commented out. Any client that tries this handler will get a stuck connection with no config/ready message and none of the v2 timer, transcript, or termination logic will ever execute.
Useful? React with 👍 / 👎.
Four smoke tests validating real API assumptions: 1. Type stub compatibility (_Blob/_Content/_Part vs google.genai types) 2. Live session round-trip (send text, receive audio/transcript) 3. Session resumption handle emission 4. classify_adk_event against real Gemini events Excluded from default test runs via pytest marker (-m "not smoke"). Run with: GOOGLE_API_KEY=<key> pytest -m smoke test/python/smoke/ -v -s Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
Use gemini-2.5-flash-live-preview (matching v1 handler) instead of non-existent gemini-2.5-flash-preview-native-audio-dialog. Model is now configurable via GEMINI_LIVE_MODEL env var. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
…o-preview-12-2025 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
…w_isoformat - Fix session timer edge case where reconnecting to a near-expired session could run 300s past the wall clock limit (Codex #2). Timer now terminates immediately if already past deadline, and caps phase 2 wait to actual remaining time. - Replace manual JSON parsing with VoiceRequest Pydantic model in _receive_from_client for validation consistency (Gemini #1). - Replace datetime.now(timezone.utc).isoformat() with utc_now_isoformat() from time_utils across 7 call sites (Gemini #2). - Add 2 new timer tests: already-expired session, past-warning-threshold. 54 tests passing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
- Add _initialize_adk() method that creates Runner, LiveRequestQueue,
and returns a typed VoiceSessionState
- Add router property with WebSocket endpoint at /voice-v2/ws/{session_id}
- Extend BaseHandler for compatibility with dynamic handler registration
- Register voice_v2 handler in dev.yaml config
- Entry point now runs: auth → ADK init → config → streaming → cleanup
- Constructor accepts optional storage (injectable for tests, defaults
to get_storage_backend() for production)
Uses gemini-2.5-flash-native-audio-preview-12-2025 model.
54 unit tests + 20 v1 tests passing.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
Summary
Prototype voice handler (
handler_v2_spike.py) addressing all gaps from the voice handler gap report:StorageBackend(REQ-1/4)_handle_termination()(REQ-5)Architecture highlights
_receive_from_client,_send_to_client,_session_timer,_heartbeatorchestrated viaasyncio.wait(FIRST_COMPLETED)classify_adk_event()→_send_to_client()routingVoiceSessionStatedataclass replaces v1'sDict[str, Any]_Blob/_Content/_Partavoid hard google.genai dependency in testsFiles
src/python/role_play/voice/handler_v2_spike.pytest/python/unit/voice/test_handler_v2_spike.pyTest plan
pytest -o "addopts=" test/python/unit/voice/)🤖 Generated with Claude Code