Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ async def start_session_stream(self, session_id: str, sample_rate: int = 16000):
self.active_sessions[session_id] = {
"last_activity": time.time(),
"sample_rate": sample_rate,
"audio_offset_seconds": 0.0 # Track cumulative audio duration for timestamp adjustment
}

logger.info(f"🎙️ Started Deepgram WebSocket stream for session: {session_id}")
Expand Down Expand Up @@ -164,10 +163,22 @@ async def end_session_stream(self, session_id: str):
await self.trigger_plugins(session_id, final_result)

self.active_sessions.pop(session_id, None)
logger.info(f"🛑 Ended Deepgram WebSocket stream for session: {session_id}")

# Signal that streaming transcription is complete for this session
# This allows conversation_jobs to wait for all results before reading transcript
completion_key = f"transcription:complete:{session_id}"
await self.redis_client.set(completion_key, "1", ex=300) # 5 min TTL
logger.info(f"✅ Streaming transcription complete for {session_id} (signal set)")

except Exception as e:
logger.error(f"Error ending stream for {session_id}: {e}", exc_info=True)
# Still signal completion even on error so conversation job doesn't hang
try:
completion_key = f"transcription:complete:{session_id}"
await self.redis_client.set(completion_key, "error", ex=300)
logger.warning(f"⚠️ Set error completion signal for {session_id}")
except Exception:
pass # Best effort
Comment on lines +175 to +181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, find and examine the file
find . -type f -name "streaming_consumer.py" | grep -E "transcription|advanced_omi_backend"

Repository: SimpleOpenSoftware/chronicle

Length of output: 161


🏁 Script executed:

# Check the file exists and read around the specified lines
if [ -f "backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py" ]; then
  echo "=== File found, showing lines 170-185 ==="
  sed -n '170,185p' "backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py" | cat -n
else
  echo "File not found, searching for similar paths..."
  find . -name "streaming_consumer.py" -type f
fi

Repository: SimpleOpenSoftware/chronicle

Length of output: 1089


🏁 Script executed:

# Get file size first to see if it's manageable
wc -l "backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py" 2>/dev/null || echo "File path needs adjustment"

Repository: SimpleOpenSoftware/chronicle

Length of output: 163


🏁 Script executed:

# Search for error handling patterns (except blocks) in the file
rg "except\s+Exception" "backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py" -A 2 -B 1 || echo "Pattern not found with that path"

Repository: SimpleOpenSoftware/chronicle

Length of output: 1821


🏁 Script executed:

# Also check how logging is used in the file
rg "logger\.(debug|warning|error|info)" "backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py" | head -20

Repository: SimpleOpenSoftware/chronicle

Length of output: 1785


Don't swallow errors when setting the completion signal.

The bare except Exception: pass hides failures that are useful for debugging. Similar error handlers in the module log Redis failures (e.g., logger.warning(f"Failed to set error flag in Redis: {redis_error}")), so logging at least a debug/warn level here maintains the "best effort" behavior while preserving observability.

🔧 Suggested fix
             try:
                 completion_key = f"transcription:complete:{session_id}"
                 await self.redis_client.set(completion_key, "error", ex=300)
                 logger.warning(f"⚠️ Set error completion signal for {session_id}")
-            except Exception:
-                pass  # Best effort
+            except Exception as signal_err:
+                logger.debug(
+                    f"Failed to set completion signal for {session_id}: {signal_err}",
+                    exc_info=True
+                )  # Best effort
🧰 Tools
🪛 Ruff (0.14.14)

180-181: try-except-pass detected, consider logging the exception

(S110)


180-180: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In
`@backends/advanced/src/advanced_omi_backend/services/transcription/streaming_consumer.py`
around lines 175 - 181, The bare "except Exception: pass" in the block that sets
completion_key (f"transcription:complete:{session_id}") hides Redis failures;
change it to catch the exception into a variable (e.g., except Exception as
redis_error:) and log it (e.g., logger.warning or logger.debug) with context
like "Failed to set error flag in Redis" including redis_error, while keeping
the best-effort behavior (do not re-raise) and referencing self.redis_client,
completion_key, session_id, and logger so observability is preserved.


async def process_audio_chunk(self, session_id: str, audio_chunk: bytes, chunk_id: str):
"""
Expand Down Expand Up @@ -250,11 +261,11 @@ async def publish_to_client(self, session_id: str, result: Dict, is_final: bool)

async def store_final_result(self, session_id: str, result: Dict, chunk_id: str = None):
"""
Store final transcription result to Redis Stream with cumulative timestamp adjustment.
Store final transcription result to Redis Stream.

Transcription providers return word timestamps that reset to 0 for each chunk.
We maintain a running audio_offset_seconds to make timestamps cumulative across
the session, enabling accurate speech duration calculation for speech detection.
Note: Deepgram streaming WebSocket maintains state and returns cumulative
timestamps from the start of the stream. No offset adjustment is needed.
Previous code incorrectly assumed per-chunk timestamps starting at 0.

Args:
session_id: Session ID
Expand All @@ -264,38 +275,9 @@ async def store_final_result(self, session_id: str, result: Dict, chunk_id: str
try:
stream_name = f"transcription:results:{session_id}"

# Get cumulative audio offset for this session
audio_offset = 0.0
chunk_duration = 0.0
if session_id in self.active_sessions:
audio_offset = self.active_sessions[session_id].get("audio_offset_seconds", 0.0)

# Adjust word timestamps by cumulative offset
# Get words and segments directly - Deepgram returns cumulative timestamps
words = result.get("words", [])
adjusted_words = []
if words:
for word in words:
adjusted_word = word.copy()
adjusted_word["start"] = word.get("start", 0.0) + audio_offset
adjusted_word["end"] = word.get("end", 0.0) + audio_offset
adjusted_words.append(adjusted_word)

# Calculate chunk duration from last word's end time
if adjusted_words:
last_word_end = words[-1].get("end", 0.0) # Use unadjusted for duration calc
chunk_duration = last_word_end

logger.debug(f"➡️ [STREAMING] Adjusted {len(adjusted_words)} words by +{audio_offset:.1f}s (chunk_duration={chunk_duration:.1f}s)")

# Adjust segment timestamps too
segments = result.get("segments", [])
adjusted_segments = []
if segments:
for seg in segments:
adjusted_seg = seg.copy()
adjusted_seg["start"] = seg.get("start", 0.0) + audio_offset
adjusted_seg["end"] = seg.get("end", 0.0) + audio_offset
adjusted_segments.append(adjusted_seg)

# Prepare result entry - MUST match aggregator's expected schema
# All keys and values must be bytes to match consumer.py format
Expand All @@ -308,23 +290,17 @@ async def store_final_result(self, session_id: str, result: Dict, chunk_id: str
b"timestamp": str(time.time()).encode(),
}

# Add adjusted JSON fields
if adjusted_words:
entry[b"words"] = json.dumps(adjusted_words).encode()
# Add words and segments directly (already have cumulative timestamps from Deepgram)
if words:
entry[b"words"] = json.dumps(words).encode()

if adjusted_segments:
entry[b"segments"] = json.dumps(adjusted_segments).encode()
if segments:
entry[b"segments"] = json.dumps(segments).encode()

# Write to Redis Stream
await self.redis_client.xadd(stream_name, entry)

# Update cumulative offset for next chunk
if session_id in self.active_sessions and chunk_duration > 0:
self.active_sessions[session_id]["audio_offset_seconds"] += chunk_duration
new_offset = self.active_sessions[session_id]["audio_offset_seconds"]
logger.info(f"💾 Stored final result to {stream_name}: {result.get('text', '')[:50]}... (offset: {audio_offset:.1f}s → {new_offset:.1f}s)")
else:
logger.info(f"💾 Stored final result to {stream_name}: {result.get('text', '')[:50]}...")
logger.info(f"💾 Stored final result to {stream_name}: {result.get('text', '')[:50]}... ({len(words)} words)")

except Exception as e:
logger.error(f"Error storing final result for {session_id}: {e}", exc_info=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,30 @@ async def open_conversation_job(
# to avoid false negatives from aggregated results lacking proper word-level data
logger.info("✅ Conversation has meaningful speech (validated during streaming), proceeding with post-processing")

# Wait for streaming transcription consumer to complete before reading transcript
# This fixes the race condition where conversation job reads transcript before
# streaming consumer stores all final results (seen as 24+ second delay in logs)
completion_key = f"transcription:complete:{session_id}"
max_wait_streaming = 30 # seconds
waited_streaming = 0.0
while waited_streaming < max_wait_streaming:
completion_status = await redis_client.get(completion_key)
if completion_status:
status_str = completion_status.decode() if isinstance(completion_status, bytes) else completion_status
if status_str == "error":
logger.warning(f"⚠️ Streaming transcription ended with error for {session_id}, proceeding anyway")
else:
logger.info(f"✅ Streaming transcription confirmed complete for {session_id}")
break
await asyncio.sleep(0.5)
waited_streaming += 0.5

if waited_streaming >= max_wait_streaming:
logger.warning(
f"⚠️ Timed out waiting for streaming completion signal for {session_id} "
f"(waited {max_wait_streaming}s), proceeding with available transcript"
)

# Wait for audio_streaming_persistence_job to complete and write MongoDB chunks
from advanced_omi_backend.utils.audio_chunk_utils import wait_for_audio_chunks

Expand Down
Loading