From dabccc015bf2bee3f33e320b10f8606ebd9974d6 Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Thu, 29 Jan 2026 14:49:02 +0000 Subject: [PATCH] Enhance StreamingTranscriptionConsumer and conversation job handling - Removed cumulative audio offset tracking from StreamingTranscriptionConsumer as Deepgram provides cumulative timestamps directly. - Updated store_final_result method to utilize Deepgram's cumulative timestamps without adjustments. - Implemented completion signaling for transcription sessions in Redis, ensuring conversation jobs wait for all results before processing. - Improved error handling to signal completion even in case of errors, preventing conversation jobs from hanging. - Enhanced logging for better visibility of transcription completion and error states. --- .../transcription/streaming_consumer.py | 72 +++++++------------ .../workers/conversation_jobs.py | 24 +++++++ 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py b/backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py index b6c05ae8..a58e886a 100644 --- a/backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py +++ b/backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py @@ -125,7 +125,6 @@ async def start_session_stream(self, session_id: str, sample_rate: int = 16000): self.active_sessions[session_id] = { "last_activity": time.time(), "sample_rate": sample_rate, - "audio_offset_seconds": 0.0 # Track cumulative audio duration for timestamp adjustment } logger.info(f"🎙️ Started Deepgram WebSocket stream for session: {session_id}") @@ -164,10 +163,22 @@ async def end_session_stream(self, session_id: str): await self.trigger_plugins(session_id, final_result) self.active_sessions.pop(session_id, None) - logger.info(f"🛑 Ended Deepgram WebSocket stream for session: {session_id}") + + # Signal that streaming transcription is complete for this session + # This allows conversation_jobs to wait for all results before reading transcript + completion_key = f"transcription:complete:{session_id}" + await self.redis_client.set(completion_key, "1", ex=300) # 5 min TTL + logger.info(f"✅ Streaming transcription complete for {session_id} (signal set)") except Exception as e: logger.error(f"Error ending stream for {session_id}: {e}", exc_info=True) + # Still signal completion even on error so conversation job doesn't hang + try: + completion_key = f"transcription:complete:{session_id}" + await self.redis_client.set(completion_key, "error", ex=300) + logger.warning(f"⚠️ Set error completion signal for {session_id}") + except Exception: + pass # Best effort async def process_audio_chunk(self, session_id: str, audio_chunk: bytes, chunk_id: str): """ @@ -250,11 +261,11 @@ async def publish_to_client(self, session_id: str, result: Dict, is_final: bool) async def store_final_result(self, session_id: str, result: Dict, chunk_id: str = None): """ - Store final transcription result to Redis Stream with cumulative timestamp adjustment. + Store final transcription result to Redis Stream. - Transcription providers return word timestamps that reset to 0 for each chunk. - We maintain a running audio_offset_seconds to make timestamps cumulative across - the session, enabling accurate speech duration calculation for speech detection. + Note: Deepgram streaming WebSocket maintains state and returns cumulative + timestamps from the start of the stream. No offset adjustment is needed. + Previous code incorrectly assumed per-chunk timestamps starting at 0. Args: session_id: Session ID @@ -264,38 +275,9 @@ async def store_final_result(self, session_id: str, result: Dict, chunk_id: str try: stream_name = f"transcription:results:{session_id}" - # Get cumulative audio offset for this session - audio_offset = 0.0 - chunk_duration = 0.0 - if session_id in self.active_sessions: - audio_offset = self.active_sessions[session_id].get("audio_offset_seconds", 0.0) - - # Adjust word timestamps by cumulative offset + # Get words and segments directly - Deepgram returns cumulative timestamps words = result.get("words", []) - adjusted_words = [] - if words: - for word in words: - adjusted_word = word.copy() - adjusted_word["start"] = word.get("start", 0.0) + audio_offset - adjusted_word["end"] = word.get("end", 0.0) + audio_offset - adjusted_words.append(adjusted_word) - - # Calculate chunk duration from last word's end time - if adjusted_words: - last_word_end = words[-1].get("end", 0.0) # Use unadjusted for duration calc - chunk_duration = last_word_end - - logger.debug(f"➡️ [STREAMING] Adjusted {len(adjusted_words)} words by +{audio_offset:.1f}s (chunk_duration={chunk_duration:.1f}s)") - - # Adjust segment timestamps too segments = result.get("segments", []) - adjusted_segments = [] - if segments: - for seg in segments: - adjusted_seg = seg.copy() - adjusted_seg["start"] = seg.get("start", 0.0) + audio_offset - adjusted_seg["end"] = seg.get("end", 0.0) + audio_offset - adjusted_segments.append(adjusted_seg) # Prepare result entry - MUST match aggregator's expected schema # All keys and values must be bytes to match consumer.py format @@ -308,23 +290,17 @@ async def store_final_result(self, session_id: str, result: Dict, chunk_id: str b"timestamp": str(time.time()).encode(), } - # Add adjusted JSON fields - if adjusted_words: - entry[b"words"] = json.dumps(adjusted_words).encode() + # Add words and segments directly (already have cumulative timestamps from Deepgram) + if words: + entry[b"words"] = json.dumps(words).encode() - if adjusted_segments: - entry[b"segments"] = json.dumps(adjusted_segments).encode() + if segments: + entry[b"segments"] = json.dumps(segments).encode() # Write to Redis Stream await self.redis_client.xadd(stream_name, entry) - # Update cumulative offset for next chunk - if session_id in self.active_sessions and chunk_duration > 0: - self.active_sessions[session_id]["audio_offset_seconds"] += chunk_duration - new_offset = self.active_sessions[session_id]["audio_offset_seconds"] - logger.info(f"💾 Stored final result to {stream_name}: {result.get('text', '')[:50]}... (offset: {audio_offset:.1f}s → {new_offset:.1f}s)") - else: - logger.info(f"💾 Stored final result to {stream_name}: {result.get('text', '')[:50]}...") + logger.info(f"💾 Stored final result to {stream_name}: {result.get('text', '')[:50]}... ({len(words)} words)") except Exception as e: logger.error(f"Error storing final result for {session_id}: {e}", exc_info=True) diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index fd5875e2..fec74f19 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -593,6 +593,30 @@ async def open_conversation_job( # to avoid false negatives from aggregated results lacking proper word-level data logger.info("✅ Conversation has meaningful speech (validated during streaming), proceeding with post-processing") + # Wait for streaming transcription consumer to complete before reading transcript + # This fixes the race condition where conversation job reads transcript before + # streaming consumer stores all final results (seen as 24+ second delay in logs) + completion_key = f"transcription:complete:{session_id}" + max_wait_streaming = 30 # seconds + waited_streaming = 0.0 + while waited_streaming < max_wait_streaming: + completion_status = await redis_client.get(completion_key) + if completion_status: + status_str = completion_status.decode() if isinstance(completion_status, bytes) else completion_status + if status_str == "error": + logger.warning(f"⚠️ Streaming transcription ended with error for {session_id}, proceeding anyway") + else: + logger.info(f"✅ Streaming transcription confirmed complete for {session_id}") + break + await asyncio.sleep(0.5) + waited_streaming += 0.5 + + if waited_streaming >= max_wait_streaming: + logger.warning( + f"⚠️ Timed out waiting for streaming completion signal for {session_id} " + f"(waited {max_wait_streaming}s), proceeding with available transcript" + ) + # Wait for audio_streaming_persistence_job to complete and write MongoDB chunks from advanced_omi_backend.utils.audio_chunk_utils import wait_for_audio_chunks