-
Notifications
You must be signed in to change notification settings - Fork 0
feat(asr): ✨ add audio transcription endpoint #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release/v2.0.0
Are you sure you want to change the base?
Conversation
- Introduced a new endpoint for audio transcription using websockets. - Implemented streaming of audio files to an external transcription service. - Added necessary models and validation for transcription responses. - Updated settings to include TRANSCRIBE_WS_URI for configuration.
Reviewer's GuideAdds a new FastAPI audio transcription endpoint that streams uploaded audio over a websocket to an external ASR service, validates and normalizes the streaming responses into typed models, and exposes them as a structured transcription API, with corresponding configuration and dependency updates. Sequence diagram for the new audio transcription endpointsequenceDiagram
actor Client
participant FastAPIApp
participant TranscribeEndpoint as transcribe
participant AsrWebsocketClient as websockets_client
participant ExternalASRService
Client->>FastAPIApp: POST /transcribe (file)
FastAPIApp->>TranscribeEndpoint: transcribe(file)
TranscribeEndpoint->>TranscribeEndpoint: check settings.TRANSCRIBE_WS_URI
alt missing URI
TranscribeEndpoint-->>Client: HTTP 500 (TRANSCRIBE_WS_URI not configured)
else configured
TranscribeEndpoint->>AsrWebsocketClient: connect(TRANSCRIBE_WS_URI)
AsrWebsocketClient->>ExternalASRService: WebSocket handshake
activate ExternalASRService
TranscribeEndpoint->>TranscribeEndpoint: create_task(_receive_updates)
TranscribeEndpoint->>TranscribeEndpoint: _stream_audio(file, websocket)
loop audio chunks
TranscribeEndpoint->>ExternalASRService: send(chunk bytes)
ExternalASRService-->>AsrWebsocketClient: partial JSON updates
AsrWebsocketClient->>TranscribeEndpoint: _receive_updates parses WLKMessageRawResponse
end
TranscribeEndpoint->>ExternalASRService: send(empty frame)
ExternalASRService-->>AsrWebsocketClient: status active_transcription
AsrWebsocketClient->>TranscribeEndpoint: WLKMessageStatus(last_active)
ExternalASRService-->>AsrWebsocketClient: ready_to_stop
AsrWebsocketClient->>TranscribeEndpoint: WLKMessageReadyToStopMessage
deactivate ExternalASRService
TranscribeEndpoint->>TranscribeEndpoint: map lines to list TranscriptionItem
TranscribeEndpoint-->>Client: 200 OK (list TranscriptionItem)
end
Class diagram for new ASR websocket models and transcription itemclassDiagram
class BaseModel
class WLKMessageModelConfig {
+str asr_model
+str asr_backend
+str diarization_model
+str diarization_backend
}
class WLKMessageConfig {
+Literal_config type
+bool useAudioWorklet
+WLKMessageModelConfig models
}
class WLKMessageTranscriptionLine {
+int speaker
+str text
+timedelta start
+timedelta end
+bool final
+str speaker_id
+str detected_language
+parse_hhmmss(value) timedelta
}
class WLKMessageStatus {
+str status
+list~WLKMessageTranscriptionLine~ lines
+str buffer_transcription
+str buffer_diarization
+str buffer_translation
+float remaining_time_transcription
+float remaining_time_diarization
+dict~str,str~ speaker_ids
}
class WLKMessageSpeakerEmbeddings {
+Literal_speaker_embeddings type
+dict~str,str~ speaker_ids
+int speaker_id_bits
+WLKMessageModelConfig models
}
class WLKMessageReadyToStopMessage {
+Literal_ready_to_stop type
}
class WLKMessageRawResponse {
<<union>>
WLKMessageConfig
WLKMessageStatus
WLKMessageSpeakerEmbeddings
WLKMessageReadyToStopMessage
}
class TranscriptionItem {
+int speaker_no
+str speaker_id
+timedelta start
+timedelta end
+str text
+parse_hhmmss(value) timedelta
}
BaseModel <|-- WLKMessageModelConfig
BaseModel <|-- WLKMessageConfig
BaseModel <|-- WLKMessageTranscriptionLine
BaseModel <|-- WLKMessageStatus
BaseModel <|-- WLKMessageSpeakerEmbeddings
BaseModel <|-- WLKMessageReadyToStopMessage
BaseModel <|-- TranscriptionItem
WLKMessageConfig --> WLKMessageModelConfig : models
WLKMessageSpeakerEmbeddings --> WLKMessageModelConfig : models
WLKMessageStatus --> WLKMessageTranscriptionLine : lines
WLKMessageRawResponse ..> WLKMessageConfig
WLKMessageRawResponse ..> WLKMessageStatus
WLKMessageRawResponse ..> WLKMessageSpeakerEmbeddings
WLKMessageRawResponse ..> WLKMessageReadyToStopMessage
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - I've found 3 issues, and left some high level feedback:
- The
_stream_audiofunction reads the entire uploaded file into memory and then decodes/resamples it withlibrosa.load; if you expect large inputs, consider enforcing a maximum file size/duration or using a more streaming-friendly decoding approach to avoid high memory usage and latency. WLKMessageStatus.statusis typed as a plainstrbut later matched against the literal value"active_transcription"; tightening this to aLiteral[...](or an enum) would make the pattern matching and validation more robust and self-documenting.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `_stream_audio` function reads the entire uploaded file into memory and then decodes/resamples it with `librosa.load`; if you expect large inputs, consider enforcing a maximum file size/duration or using a more streaming-friendly decoding approach to avoid high memory usage and latency.
- `WLKMessageStatus.status` is typed as a plain `str` but later matched against the literal value `"active_transcription"`; tightening this to a `Literal[...]` (or an enum) would make the pattern matching and validation more robust and self-documenting.
## Individual Comments
### Comment 1
<location> `aymurai/api/endpoints/routers/asr/transcribe.py:38-39` </location>
<code_context>
+ file: UploadFile,
+ websocket: websockets.ClientConnection,
+) -> int:
+ payload = await file.read()
+ audio, _ = librosa.load(io.BytesIO(payload), sr=SAMPLE_RATE_HZ, mono=True)
+ total_bytes = 0
+ for i in range(0, len(audio), CHUNK_SAMPLES):
</code_context>
<issue_to_address>
**suggestion (performance):** Reading the entire file into memory can be problematic for large uploads.
This reads the entire upload into memory before decoding, which risks excessive memory use or crashes with large files. Please either enforce and validate a maximum upload size before reading, or switch to a streaming-friendly decoding approach if supported by your ASR stack.
</issue_to_address>
### Comment 2
<location> `aymurai/api/endpoints/routers/asr/transcribe.py:80-89` </location>
<code_context>
+ return None
+
+
+async def _receive_updates(
+ websocket: websockets.ClientConnection,
+) -> WLKMessageStatus | None:
+ last_active_transcription: WLKMessageStatus | None = None
+ while True:
+ try:
+ msg = await websocket.recv()
+ except websockets.exceptions.ConnectionClosedOK:
+ logger.info("connection closed normally")
+ break
+ except websockets.exceptions.WebSocketException as exc:
+ logger.error(f"websocket error while receiving updates: {exc}")
+ break
+
+ parsed = _parse_ws_message(msg)
+ match parsed:
+ case None:
+ continue
+ case WLKMessageStatus(status="active_transcription") as message:
+ last_active_transcription = message
+ case WLKMessageReadyToStopMessage():
+ break
+
+ return last_active_transcription
+
+
</code_context>
<issue_to_address>
**issue:** Websocket receive loop may never terminate if the remote service misbehaves.
The loop only exits on specific close/exception cases or a `ready_to_stop` message. If the server stays open but stops sending messages, this coroutine will hang and `transcribe` will never return. Please add a safeguard (e.g., overall timeout via `asyncio.wait_for`, periodic inactivity timeout, or a max message count) to handle a stalled backend.
</issue_to_address>
### Comment 3
<location> `aymurai/api/endpoints/routers/asr/transcribe.py:63-72` </location>
<code_context>
+ try:
+ async with websockets.connect(settings.TRANSCRIBE_WS_URI) as websocket:
+ receive_task = asyncio.create_task(_receive_updates(websocket))
+ try:
+ total_bytes = await _stream_audio(file, websocket)
+ await websocket.send(b"")
+ logger.info(f"sent {total_bytes} bytes to transcription service")
+ last_active_transcription = await receive_task
+ except Exception:
+ if not receive_task.done():
+ receive_task.cancel()
+ with contextlib.suppress(asyncio.CancelledError):
+ await receive_task
+ raise
+ except websockets.exceptions.WebSocketException as exc:
+ logger.error(f"websocket error during transcription: {exc}")
+ raise HTTPException(
+ status_code=status.HTTP_502_BAD_GATEWAY,
+ detail="Transcription service websocket error",
</code_context>
<issue_to_address>
**suggestion (bug_risk):** All non-websocket errors are mapped to a generic 500, masking client-side issues.
Exceptions from `_stream_audio` (e.g. invalid/unsupported audio format, decoding errors) are all surfaced as 5xx. Consider explicitly catching known decoding/validation errors and returning a 4xx for bad input, keeping 5xx for genuine backend or internal failures.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| payload = await file.read() | ||
| audio, _ = librosa.load(io.BytesIO(payload), sr=SAMPLE_RATE_HZ, mono=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): Reading the entire file into memory can be problematic for large uploads.
This reads the entire upload into memory before decoding, which risks excessive memory use or crashes with large files. Please either enforce and validate a maximum upload size before reading, or switch to a streaming-friendly decoding approach if supported by your ASR stack.
| async def _receive_updates( | ||
| websocket: websockets.ClientConnection, | ||
| ) -> WLKMessageStatus | None: | ||
| last_active_transcription: WLKMessageStatus | None = None | ||
| while True: | ||
| try: | ||
| msg = await websocket.recv() | ||
| except websockets.exceptions.ConnectionClosedOK: | ||
| logger.info("connection closed normally") | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: Websocket receive loop may never terminate if the remote service misbehaves.
The loop only exits on specific close/exception cases or a ready_to_stop message. If the server stays open but stops sending messages, this coroutine will hang and transcribe will never return. Please add a safeguard (e.g., overall timeout via asyncio.wait_for, periodic inactivity timeout, or a max message count) to handle a stalled backend.
| try: | ||
| parsed = json.loads(message) | ||
| except json.JSONDecodeError: | ||
| logger.warning(f"received non-json websocket payload: {payload_preview}") | ||
| return None | ||
|
|
||
| try: | ||
| return ASR_RAW_RESPONSE_ADAPTER.validate_python(parsed) | ||
| except ValidationError as exc: | ||
| logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): All non-websocket errors are mapped to a generic 500, masking client-side issues.
Exceptions from _stream_audio (e.g. invalid/unsupported audio format, decoding errors) are all surfaced as 5xx. Consider explicitly catching known decoding/validation errors and returning a 4xx for bad input, keeping 5xx for genuine backend or internal failures.
Summary by Sourcery
Add a new audio transcription API that streams uploaded audio to an external websocket-based ASR service and returns structured transcription results.
New Features:
Enhancements:
Build: