From 22eedde34817c5a023b6e8699b6e7062cdccf497 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Thu, 6 Nov 2025 21:33:14 +0300 Subject: [PATCH 01/14] update benchmarks.md --- BENCHMARKS.md | 60 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index 58e69b0..c7b1232 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -479,8 +479,8 @@ ### OpenAI - **Latest Run:** `2025-11-06` -- **Model Version:** `whisper-1` (also known as `large-v2`) -- **Configuration:** OpenAI's Whisper API for transcription with keyword prompting. See [OpenAI Speech to Text: Prompting](https://platform.openai.com/docs/guides/speech-to-text#prompting) for more details. +- **Model Version:** `whisper-1` +- **Configuration:** OpenAI's Whisper API for transcription with prompt-based keyword boosting. See [OpenAI Whisper API](https://platform.openai.com/docs/guides/speech-to-text) for more details. - **Code Reference:** [openbench/pipeline/transcription/transcription_openai.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_openai.py) - **Hardware**: Unknown (Cloud API) @@ -498,7 +498,14 @@ - **Code Reference:** [openbench/pipeline/transcription/transcription_oss_whisper.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_oss_whisper.py) - **Hardware**: M2 Ultra Mac Studio -### Argmax +### Argmax (Parakeet V2) +- **Latest Run:** `2025-11-06` +- **Model Version:** `parakeet-v2` +- **Configuration:** Argmax WhisperKit Pro with compressed Parakeet V2 model (i.e. `parakeet-v2_476MB`) with custom vocabulary support. +- **Code Reference:** [openbench/pipeline/transcription/transcription_whisperkitpro.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_whisperkitpro.py) +- **Hardware**: M2 Ultra Mac Studio + +### Argmax (Parakeet V3) - **Latest Run:** `2025-11-06` - **Model Version:** `parakeet-v3` - **Configuration:** Argmax SDK WhisperKit Pro framework with compressed Parakeet V3 model (i.e. `parakeet-v3_494MB`) and Custom Vocabulary feature enabled. See [Argmax Custom Vocabulary](https://app.argmaxinc.com/docs/examples/custom-vocabulary) for more details. @@ -524,6 +531,21 @@
+## Keywords + +
+Click to expand + +The dataset consists of trimmed audio samples extracted from 1-2 hour earnings call recordings. Custom vocabulary evaluation uses two keyword categories to assess system performance under different vocabulary conditions: + +**Chunk keywords** include only the keywords that actually appear in the trimmed audio chunk being evaluated. This represents a focused vocabulary set that matches the content of the specific audio sample. + +**File keywords** include all keywords from the entire source file (1-2 hours) that the audio chunk was trimmed from. This means the system is provided with a comprehensive vocabulary list that may contain keywords not present in the actual audio chunk being transcribed. This tests a system's ability to handle extensive custom vocabularies without introducing false positives from irrelevant keywords. + +
+ +
+ ## Word Error Rate (WER)
@@ -537,10 +559,10 @@
-| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | -|----------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| -| earnings22-keywords
(chunk-keywords) | 9.32 | 9.74 | 9.68 | 10 | 9.21 | -| earnings22-keywords
(file-keywords) | 9.81 | 9.57 | 9.63 | 10.3 | 9.63 | +| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords | Chunk | 9.32 | 9.74 | 9.68 | 10 | 9.5 | 9.21 | +| earnings22-keywords | File | 9.81 | 9.57 | 9.63 | 10.3 | 10 | 9.63 |

@@ -563,10 +585,10 @@ If the model predicts 20 keywords and 15 of them match the ground truth, precisi -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| -| earnings22-keywords
(chunk-keywords) |0.98 | 0.98 | 0.98 | 0.96 | 0.96 | -| earnings22-keywords
(file-keywords) |0.94 | 0.9 | 0.94 | 0.95 | 0.87 | +| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords | Chunk |0.98 | 0.98 | 0.98 | 0.96 | 0.96 | 0.96 | +| earnings22-keywords | File |0.94 | 0.9 | 0.94 | 0.95 | 0.87 | 0.87 |

@@ -589,10 +611,10 @@ If the ground-truth transcript has 25 keywords and the model correctly finds 15, -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| -| earnings22-keywords
(chunk-keywords) | 0.89 | 0.69 | 0.7 | 0.81 | 0.88 | -| earnings22-keywords
(file-keywords) | 0.83 | 0.79 | 0.68 | 0.76 | 0.85 | +| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords | Chunk | 0.89 | 0.69 | 0.7 | 0.81 | 0.89 | 0.88 | +| earnings22-keywords | File | 0.83 | 0.79 | 0.68 | 0.76 | 0.86 | 0.85 |

@@ -616,10 +638,10 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | -|----------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| -| earnings22-keywords
(chunk-keywords) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | -| earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | +| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords | Chunk | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | +| earnings22-keywords | File | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 |

From 9eaa8a797cb23ad9c84a1442f9018dad864365d1 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Thu, 6 Nov 2025 21:34:54 +0300 Subject: [PATCH 02/14] Refactor --- BENCHMARKS.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index c7b1232..bf62298 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -538,9 +538,9 @@ The dataset consists of trimmed audio samples extracted from 1-2 hour earnings call recordings. Custom vocabulary evaluation uses two keyword categories to assess system performance under different vocabulary conditions: -**Chunk keywords** include only the keywords that actually appear in the trimmed audio chunk being evaluated. This represents a focused vocabulary set that matches the content of the specific audio sample. +**Chunk keywords**: include only the keywords that actually appear in the trimmed audio chunk being evaluated. This represents a focused vocabulary set that matches the content of the specific audio sample. -**File keywords** include all keywords from the entire source file (1-2 hours) that the audio chunk was trimmed from. This means the system is provided with a comprehensive vocabulary list that may contain keywords not present in the actual audio chunk being transcribed. This tests a system's ability to handle extensive custom vocabularies without introducing false positives from irrelevant keywords. +**File keywords**: include all keywords from the entire source file (1-2 hours) that the audio chunk was trimmed from. This means the system is provided with a comprehensive vocabulary list that may contain keywords not present in the actual audio chunk being transcribed. This tests a system's ability to handle extensive custom vocabularies without introducing false positives from irrelevant keywords. From d3c92cd123f6b5c853468200e0b3de4c96541937 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Thu, 6 Nov 2025 22:41:28 +0300 Subject: [PATCH 03/14] Refactor --- BENCHMARKS.md | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index bf62298..abdece8 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -559,10 +559,10 @@ The dataset consists of trimmed audio samples extracted from 1-2 hour earnings c -| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords | Chunk | 9.32 | 9.74 | 9.68 | 10 | 9.5 | 9.21 | -| earnings22-keywords | File | 9.81 | 9.57 | 9.63 | 10.3 | 10 | 9.63 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords (chunk-level) | 9.32 | 9.74 | 9.68 | 10 | 9.5 | 9.21 | +| earnings22-keywords (file-level) | 9.81 | 9.57 | 9.63 | 10.3 | 10 | 9.63 |

@@ -585,10 +585,10 @@ If the model predicts 20 keywords and 15 of them match the ground truth, precisi -| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords | Chunk |0.98 | 0.98 | 0.98 | 0.96 | 0.96 | 0.96 | -| earnings22-keywords | File |0.94 | 0.9 | 0.94 | 0.95 | 0.87 | 0.87 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords (chunk-level) | 0.98 | 0.98 | 0.98 | 0.96 | 0.96 | 0.96 | +| earnings22-keywords (file-level) | 0.94 | 0.9 | 0.94 | 0.95 | 0.87 | 0.87 |

@@ -611,10 +611,10 @@ If the ground-truth transcript has 25 keywords and the model correctly finds 15, -| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords | Chunk | 0.89 | 0.69 | 0.7 | 0.81 | 0.89 | 0.88 | -| earnings22-keywords | File | 0.83 | 0.79 | 0.68 | 0.76 | 0.86 | 0.85 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords (chunk-level) | 0.89 | 0.69 | 0.7 | 0.81 | 0.89 | 0.88 | +| earnings22-keywords (file-level) | 0.83 | 0.79 | 0.68 | 0.76 | 0.86 | 0.85 |

@@ -638,10 +638,10 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa -| Dataset | Keywords | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords | Chunk | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | -| earnings22-keywords | File | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| +| earnings22-keywords (chunk-level) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | +| earnings22-keywords (file-level) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 |

From f0e2dec7ac629f9594d4b2d94bbf344c816f4a98 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:41:56 +0300 Subject: [PATCH 04/14] add deepgram streaming diarization pipeline --- .../datasets/callhome_english.yaml | 6 + config/benchmark_config/metrics/cpwer.yaml | 3 + .../DeepgramStreamingDiarizationPipeline.yaml | 8 + src/openbench/dataset/dataset_aliases.py | 1 + src/openbench/dataset/dataset_registry.py | 7 +- src/openbench/metric/registry.py | 6 +- .../word_error_metrics/word_error_metrics.py | 10 +- src/openbench/pipeline/__init__.py | 1 + .../streaming_diarization/__init__.py | 13 + .../pipeline/streaming_diarization/common.py | 13 + .../streaming_diarization/deepgram.py | 316 ++++++++++++++++++ src/openbench/pipeline_prediction.py | 56 ++++ src/openbench/runner/benchmark.py | 10 +- src/openbench/types.py | 1 + 14 files changed, 445 insertions(+), 6 deletions(-) create mode 100644 config/benchmark_config/datasets/callhome_english.yaml create mode 100644 config/benchmark_config/metrics/cpwer.yaml create mode 100644 config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml create mode 100644 src/openbench/pipeline/streaming_diarization/__init__.py create mode 100644 src/openbench/pipeline/streaming_diarization/common.py create mode 100644 src/openbench/pipeline/streaming_diarization/deepgram.py diff --git a/config/benchmark_config/datasets/callhome_english.yaml b/config/benchmark_config/datasets/callhome_english.yaml new file mode 100644 index 0000000..c6315a3 --- /dev/null +++ b/config/benchmark_config/datasets/callhome_english.yaml @@ -0,0 +1,6 @@ +callhome_english: + dataset_id: argmaxinc/callhome-english + split: test + + + diff --git a/config/benchmark_config/metrics/cpwer.yaml b/config/benchmark_config/metrics/cpwer.yaml new file mode 100644 index 0000000..c10d6c9 --- /dev/null +++ b/config/benchmark_config/metrics/cpwer.yaml @@ -0,0 +1,3 @@ +# Dummy argument to make the config file valid +cpwer: + skip_overlap: false \ No newline at end of file diff --git a/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml b/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml new file mode 100644 index 0000000..3f458e5 --- /dev/null +++ b/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml @@ -0,0 +1,8 @@ +DeepgramStreamingDiarizationPipeline: + config: + endpoint_url: "wss://api.deepgram.com" + sample_rate: 16000 + channels: 1 + sample_width: 2 + realtime_resolution: 0.020 + model_version: "nova-3" \ No newline at end of file diff --git a/src/openbench/dataset/dataset_aliases.py b/src/openbench/dataset/dataset_aliases.py index 14d7ac9..99bab3a 100644 --- a/src/openbench/dataset/dataset_aliases.py +++ b/src/openbench/dataset/dataset_aliases.py @@ -172,6 +172,7 @@ def register_dataset_aliases() -> None: supported_pipeline_types={ PipelineType.TRANSCRIPTION, PipelineType.ORCHESTRATION, + PipelineType.STREAMING_DIARIZATION, }, description=( "Callhome English dataset for transcription and orchestration evaluation. " diff --git a/src/openbench/dataset/dataset_registry.py b/src/openbench/dataset/dataset_registry.py index f73ae17..1fdfef7 100644 --- a/src/openbench/dataset/dataset_registry.py +++ b/src/openbench/dataset/dataset_registry.py @@ -137,5 +137,10 @@ def has_alias(cls, alias: str) -> bool: # Register all datasets DatasetRegistry.register(PipelineType.DIARIZATION, DiarizationDataset) DatasetRegistry.register(PipelineType.ORCHESTRATION, OrchestrationDataset) -DatasetRegistry.register(PipelineType.STREAMING_TRANSCRIPTION, StreamingDataset) +DatasetRegistry.register( + PipelineType.STREAMING_TRANSCRIPTION, StreamingDataset +) +DatasetRegistry.register( + PipelineType.STREAMING_DIARIZATION, OrchestrationDataset +) DatasetRegistry.register(PipelineType.TRANSCRIPTION, TranscriptionDataset) diff --git a/src/openbench/metric/registry.py b/src/openbench/metric/registry.py index 99e991b..d8eec6b 100644 --- a/src/openbench/metric/registry.py +++ b/src/openbench/metric/registry.py @@ -135,7 +135,11 @@ def get_available_metrics(cls, pipeline_type: PipelineType) -> list[MetricOption # Register all existing and interesting metrics from pyannote.metrics # Custom metrics will be registered in their own files -MetricRegistry.register(DiarizationErrorRate, PipelineType.DIARIZATION, MetricOptions.DER) +MetricRegistry.register( + DiarizationErrorRate, + (PipelineType.DIARIZATION, PipelineType.STREAMING_DIARIZATION), + MetricOptions.DER, +) MetricRegistry.register(JaccardErrorRate, PipelineType.DIARIZATION, MetricOptions.JER) MetricRegistry.register(DiarizationPurity, PipelineType.DIARIZATION, MetricOptions.DIARIZATION_PURITY) MetricRegistry.register( diff --git a/src/openbench/metric/word_error_metrics/word_error_metrics.py b/src/openbench/metric/word_error_metrics/word_error_metrics.py index c4e7fe2..c3719ae 100644 --- a/src/openbench/metric/word_error_metrics/word_error_metrics.py +++ b/src/openbench/metric/word_error_metrics/word_error_metrics.py @@ -90,7 +90,9 @@ def _get_word_error_metrics( return result.ops[0], (ref_words, hyp_words), (ref_speakers, hyp_speakers) -@MetricRegistry.register_metric(PipelineType.ORCHESTRATION, MetricOptions.WDER) +@MetricRegistry.register_metric( + (PipelineType.ORCHESTRATION, PipelineType.STREAMING_DIARIZATION), MetricOptions.WDER +) class WordDiarizationErrorRate(BaseWordErrorMetric): """Word Diarization Error Rate (WDER) implementation. @@ -222,6 +224,7 @@ def compute_metric(self, detail: Details) -> float: PipelineType.TRANSCRIPTION, PipelineType.ORCHESTRATION, PipelineType.STREAMING_TRANSCRIPTION, + PipelineType.STREAMING_DIARIZATION, ), MetricOptions.WER, ) @@ -296,7 +299,10 @@ def compute_metric(self, detail: Details) -> float: return (S + D + I) / N if N > 0 else 0.0 -@MetricRegistry.register_metric(PipelineType.ORCHESTRATION, MetricOptions.CPWER) +@MetricRegistry.register_metric( + (PipelineType.ORCHESTRATION, PipelineType.STREAMING_DIARIZATION), + MetricOptions.CPWER, +) class ConcatenatedMinimumPermutationWER(BaseWordErrorMetric): """Concatenated minimum-Permutation Word Error Rate (cpWER) implementation. diff --git a/src/openbench/pipeline/__init__.py b/src/openbench/pipeline/__init__.py index 598328b..bc405bd 100644 --- a/src/openbench/pipeline/__init__.py +++ b/src/openbench/pipeline/__init__.py @@ -6,6 +6,7 @@ from .diarization import * from .orchestration import * from .pipeline_registry import PipelineRegistry +from .streaming_diarization import * from .streaming_transcription import * from .transcription import * diff --git a/src/openbench/pipeline/streaming_diarization/__init__.py b/src/openbench/pipeline/streaming_diarization/__init__.py new file mode 100644 index 0000000..670f0b9 --- /dev/null +++ b/src/openbench/pipeline/streaming_diarization/__init__.py @@ -0,0 +1,13 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +from .common import StreamingDiarizationConfig, StreamingDiarizationOutput +from .deepgram import DeepgramStreamingDiarizationPipeline, DeepgramStreamingDiarizationPipelineConfig + +__all__ = [ + "StreamingDiarizationConfig", + "StreamingDiarizationOutput", + "DeepgramStreamingDiarizationPipeline", + "DeepgramStreamingDiarizationPipelineConfig", +] + diff --git a/src/openbench/pipeline/streaming_diarization/common.py b/src/openbench/pipeline/streaming_diarization/common.py new file mode 100644 index 0000000..2704d9c --- /dev/null +++ b/src/openbench/pipeline/streaming_diarization/common.py @@ -0,0 +1,13 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +from ...pipeline_prediction import Transcript +from ..base import PipelineConfig, PipelineOutput + + +class StreamingDiarizationConfig(PipelineConfig): + endpoint_url: str + + +class StreamingDiarizationOutput(PipelineOutput[Transcript]): ... + diff --git a/src/openbench/pipeline/streaming_diarization/deepgram.py b/src/openbench/pipeline/streaming_diarization/deepgram.py new file mode 100644 index 0000000..9bc045d --- /dev/null +++ b/src/openbench/pipeline/streaming_diarization/deepgram.py @@ -0,0 +1,316 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +import asyncio +import json +import os + +import numpy as np +import websockets +from argmaxtools.utils import get_logger +from pyannote.core import Segment +from pydantic import Field + +from openbench.dataset import OrchestrationSample + +from ...pipeline import Pipeline, register_pipeline +from ...pipeline_prediction import ( + DiarizationAnnotation, + StreamingDiarization, + Transcript, + Word, +) +from ...types import PipelineType +from .common import StreamingDiarizationConfig, StreamingDiarizationOutput + + +logger = get_logger(__name__) + +# Some parts of this code are adapted from the Deepgram streaming example at: +# https://developers.deepgram.com/docs/measuring-streaming-latency + + +class DeepgramStreamingDiarizationApi: + def __init__(self, cfg) -> None: + self.realtime_resolution = 0.020 + self.model_version = "nova-3" + self.api_key = "38c68bfb62405f1ae17a840777ef531060018c3d" + assert ( + self.api_key is not None + ), "Please set DEEPGRAM_API_KEY in environment" + self.channels = cfg.channels + self.sample_width = cfg.sample_width + self.sample_rate = cfg.sample_rate + self.host_url = os.getenv( + "DEEPGRAM_HOST_URL", "wss://api.deepgram.com" + ) + + async def run(self, data, key, channels, sample_width, sample_rate): + """Connect to Deepgram real-time endpoint with diarization. + + This streams audio data in real-time and collects diarization results. + """ + # How many bytes are contained in one second of audio. + byte_rate = sample_width * sample_rate * channels + + # Variables for collecting results + audio_cursor = 0.0 + audio_cursor_l = [] + interim_annotations_l = [] + confirmed_audio_cursor_l = [] + confirmed_interim_annotations_l = [] + final_annotation = DiarizationAnnotation() + transcript_text = "" + words_with_speakers = [] + + # Connect to the real-time streaming endpoint with diarization + url = ( + f"{self.host_url}/v1/listen?" + f"model={self.model_version}&" + f"channels={channels}&" + f"sample_rate={sample_rate}&" + f"encoding=linear16&" + f"interim_results=true&" + f"diarize=true" + ) + async with websockets.connect( + url, + additional_headers={ + "Authorization": "Token {}".format(key), + }, + ) as ws: + + async def sender(ws): + """Sends the data, mimicking real-time connection.""" + nonlocal data, audio_cursor + try: + while len(data): + # Bytes in `REALTIME_RESOLUTION` seconds + i = int(byte_rate * self.realtime_resolution) + chunk, data = data[:i], data[i:] + # Send the data + await ws.send(chunk) + # Move the audio cursor + audio_cursor += self.realtime_resolution + # Mimic real-time by waiting + await asyncio.sleep(self.realtime_resolution) + + # A CloseStream message tells Deepgram that no more audio + # will be sent. Deepgram will close the connection once all + # audio has finished processing. + await ws.send(json.dumps({"type": "CloseStream"})) + except Exception as e: + logger.error(f"Error while sending: {e}") + raise + + async def receiver(ws): + """Collect diarization results from the server.""" + nonlocal audio_cursor + nonlocal interim_annotations_l + nonlocal audio_cursor_l + nonlocal confirmed_interim_annotations_l + nonlocal confirmed_audio_cursor_l + nonlocal final_annotation + nonlocal transcript_text + nonlocal words_with_speakers + + async for msg in ws: + msg = json.loads(msg) + + if "request_id" in msg: + # This is the final metadata message + continue + + # Process words with speaker information + if "channel" in msg and "alternatives" in msg["channel"]: + alternatives = msg["channel"]["alternatives"] + if ( + len(alternatives) > 0 + and "words" in alternatives[0] + ): + words = alternatives[0]["words"] + + # Create annotation from words + annotation = DiarizationAnnotation() + for word_info in words: + if ( + "speaker" in word_info + and "start" in word_info + and "end" in word_info + ): + speaker = word_info["speaker"] + start = word_info["start"] + end = word_info["end"] + segment = Segment(start, end) + annotation[segment] = ( + f"SPEAKER_{speaker}" + ) + + if len(annotation) > 0: + if not msg.get("is_final", False): + # Interim result + audio_cursor_l.append(audio_cursor) + interim_annotations_l.append( + annotation + ) + logger.debug( + f"Interim annotation with " + f"{len(annotation)} segments" + ) + else: + # Confirmed/final result + confirmed_audio_cursor_l.append( + audio_cursor + ) + confirmed_interim_annotations_l.append( + annotation + ) + + # Merge into final annotation + for ( + segment, + _, + speaker, + ) in annotation.itertracks( + yield_label=True + ): + final_annotation[segment] = speaker + + # Collect final transcript and words + for word_info in words: + if ( + "word" in word_info + and "speaker" in word_info + ): + speaker_label = ( + f"SPEAKER_" + f"{word_info['speaker']}" + ) + words_with_speakers.append({ + "word": word_info["word"], + "speaker": speaker_label, + "start": ( + word_info.get("start", 0) + ), + "end": ( + word_info.get("end", 0) + ), + }) + + # Build full transcript with tags + if words_with_speakers: + current_speaker = None + transcript_parts = [] + for w in words_with_speakers: + spk = w["speaker"] + if spk != current_speaker: + if ( + current_speaker + is not None + ): + transcript_parts.append( + "" + ) + transcript_parts.append( + f"[{spk}]" + ) + current_speaker = spk + transcript_parts.append( + w["word"] + ) + transcript_text = " ".join( + transcript_parts + ) + + logger.debug( + f"Confirmed annotation with " + f"{len(annotation)} segments" + ) + + await asyncio.wait([ + asyncio.ensure_future(sender(ws)), + asyncio.ensure_future(receiver(ws)) + ]) + + return ( + final_annotation, + interim_annotations_l, + audio_cursor_l, + confirmed_interim_annotations_l, + confirmed_audio_cursor_l, + transcript_text, + words_with_speakers, + ) + + def __call__(self, sample): + # Sample must be in bytes + ( + final_annotation, + interim_annotations, + audio_cursor_l, + confirmed_interim_annotations, + confirmed_audio_cursor_l, + transcript_text, + words_with_speakers, + ) = asyncio.get_event_loop().run_until_complete( + self.run( + sample, + self.api_key, + self.channels, + self.sample_width, + self.sample_rate, + ) + ) + + return { + "annotation": final_annotation, + "interim_annotations": interim_annotations, + "audio_cursor": audio_cursor_l, + "confirmed_interim_annotations": confirmed_interim_annotations, + "confirmed_audio_cursor": confirmed_audio_cursor_l, + "transcript_text": transcript_text, + "words": words_with_speakers, + } + + +class DeepgramStreamingDiarizationPipelineConfig(StreamingDiarizationConfig): + sample_rate: int + channels: int + sample_width: int + realtime_resolution: float + model_version: str = Field( + default="nova-3", + description="The model to use for real-time diarization" + ) + + +@register_pipeline +class DeepgramStreamingDiarizationPipeline(Pipeline): + _config_class = DeepgramStreamingDiarizationPipelineConfig + pipeline_type = PipelineType.STREAMING_DIARIZATION + + def parse_input(self, input_sample: OrchestrationSample): + y = input_sample.waveform + y_int16 = (y * 32767).astype(np.int16) + audio_data_byte = y_int16.T.tobytes() + return audio_data_byte + + def parse_output(self, output) -> StreamingDiarizationOutput: + # Create Transcript from words with speakers + # For cpWER/WDER, we return transcript as the main prediction + words = [ + Word( + word=w["word"], + start=w.get("start"), + end=w.get("end"), + speaker=w.get("speaker"), + ) + for w in output["words"] + ] + transcript = Transcript(words=words) + + return StreamingDiarizationOutput(prediction=transcript) + + def build_pipeline(self): + pipeline = DeepgramStreamingDiarizationApi(self.config) + return pipeline diff --git a/src/openbench/pipeline_prediction.py b/src/openbench/pipeline_prediction.py index 030d6d5..fc9ddb0 100644 --- a/src/openbench/pipeline_prediction.py +++ b/src/openbench/pipeline_prediction.py @@ -274,3 +274,59 @@ def to_annotation_file(self, output_dir: str, filename: str) -> str: json.dump(data, f, indent=2) return path + + +# Streaming Diarization Prediction +class StreamingDiarization(BaseModel): + """Streaming diarization output combining real-time and diarization.""" + + annotation: DiarizationAnnotation = Field( + ..., description="The final diarization annotation" + ) + transcript: "Transcript | None" = Field( + None, description="The transcript with speaker information" + ) + audio_cursor: list[float] | None = Field( + None, description="The audio cursor in seconds for interim results" + ) + interim_annotations: list[DiarizationAnnotation] | None = Field( + None, description="Interim diarization annotations" + ) + confirmed_audio_cursor: list[float] | None = Field( + None, description="The confirmed audio cursor in seconds" + ) + confirmed_interim_annotations: list[DiarizationAnnotation] | None = Field( + None, description="The confirmed interim diarization annotations" + ) + + class Config: + arbitrary_types_allowed = True + + def to_annotation_file(self, output_dir: str, filename: str) -> str: + """Save both the final annotation and streaming metadata.""" + # Save final annotation as RTTM + rttm_path = os.path.join(output_dir, f"{filename}.rttm") + with open(rttm_path, "w") as f: + self.annotation.write_rttm(f) + + # Save streaming metadata as JSON + json_path = os.path.join(output_dir, f"{filename}_streaming.json") + data = { + "audio_cursor": self.audio_cursor, + "confirmed_audio_cursor": self.confirmed_audio_cursor, + "num_interim_annotations": ( + len(self.interim_annotations) + if self.interim_annotations + else 0 + ), + "num_confirmed_interim_annotations": ( + len(self.confirmed_interim_annotations) + if self.confirmed_interim_annotations + else 0 + ), + } + + with open(json_path, "w") as f: + json.dump(data, f, indent=2) + + return rttm_path diff --git a/src/openbench/runner/benchmark.py b/src/openbench/runner/benchmark.py index e2c055f..c2d0f3f 100644 --- a/src/openbench/runner/benchmark.py +++ b/src/openbench/runner/benchmark.py @@ -33,6 +33,7 @@ PipelineType.TRANSCRIPTION: TranscriptionSampleResult, PipelineType.ORCHESTRATION: TranscriptionSampleResult, PipelineType.STREAMING_TRANSCRIPTION: TranscriptionSampleResult, + PipelineType.STREAMING_DIARIZATION: TranscriptionSampleResult, } @@ -64,6 +65,7 @@ def __init__(self, config: BenchmarkConfig, pipelines: list[Pipeline]): PipelineType.TRANSCRIPTION: TranscriptionWandbLogger, PipelineType.ORCHESTRATION: TranscriptionWandbLogger, PipelineType.STREAMING_TRANSCRIPTION: TranscriptionWandbLogger, + PipelineType.STREAMING_DIARIZATION: TranscriptionWandbLogger, } def _get_metrics(self, pipeline: Pipeline) -> dict[str, BaseMetric]: @@ -107,8 +109,12 @@ def _process_single_sample( ) if pipeline.pipeline_type == PipelineType.DIARIZATION: - sample_results_attributes["num_speakers_predicted"] = output.prediction.num_speakers - sample_results_attributes["num_speakers_reference"] = sample.reference.num_speakers + sample_results_attributes["num_speakers_predicted"] = ( + output.prediction.num_speakers + ) + sample_results_attributes["num_speakers_reference"] = ( + sample.reference.num_speakers + ) sample_result = sample_result_class(**sample_results_attributes) diff --git a/src/openbench/types.py b/src/openbench/types.py index b4bbeaa..8fa1398 100644 --- a/src/openbench/types.py +++ b/src/openbench/types.py @@ -12,6 +12,7 @@ class PipelineType(Enum): TRANSCRIPTION = "transcription" ORCHESTRATION = "orchestration" STREAMING_TRANSCRIPTION = "streaming_transcription" + STREAMING_DIARIZATION = "streaming_diarization" # All prediction classes that we output should conform to this From 8037e294e365923fa231e0afb0900ca90ce9176c Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:45:42 +0300 Subject: [PATCH 05/14] update benchmarks --- BENCHMARKS.md | 61 ++++++++++++++++----------------------------------- 1 file changed, 19 insertions(+), 42 deletions(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index abdece8..1866ec8 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -479,8 +479,8 @@ ### OpenAI - **Latest Run:** `2025-11-06` -- **Model Version:** `whisper-1` -- **Configuration:** OpenAI's Whisper API for transcription with prompt-based keyword boosting. See [OpenAI Whisper API](https://platform.openai.com/docs/guides/speech-to-text) for more details. +- **Model Version:** `whisper-1` (also known as `large-v2`) +- **Configuration:** OpenAI's Whisper API for transcription with keyword prompting. See [OpenAI Speech to Text: Prompting](https://platform.openai.com/docs/guides/speech-to-text#prompting) for more details. - **Code Reference:** [openbench/pipeline/transcription/transcription_openai.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_openai.py) - **Hardware**: Unknown (Cloud API) @@ -498,14 +498,7 @@ - **Code Reference:** [openbench/pipeline/transcription/transcription_oss_whisper.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_oss_whisper.py) - **Hardware**: M2 Ultra Mac Studio -### Argmax (Parakeet V2) -- **Latest Run:** `2025-11-06` -- **Model Version:** `parakeet-v2` -- **Configuration:** Argmax WhisperKit Pro with compressed Parakeet V2 model (i.e. `parakeet-v2_476MB`) with custom vocabulary support. -- **Code Reference:** [openbench/pipeline/transcription/transcription_whisperkitpro.py](https://github.com/argmaxinc/OpenBench/blob/main/src/openbench/pipeline/transcription/transcription_whisperkitpro.py) -- **Hardware**: M2 Ultra Mac Studio - -### Argmax (Parakeet V3) +### Argmax - **Latest Run:** `2025-11-06` - **Model Version:** `parakeet-v3` - **Configuration:** Argmax SDK WhisperKit Pro framework with compressed Parakeet V3 model (i.e. `parakeet-v3_494MB`) and Custom Vocabulary feature enabled. See [Argmax Custom Vocabulary](https://app.argmaxinc.com/docs/examples/custom-vocabulary) for more details. @@ -531,21 +524,6 @@
-## Keywords - -
-Click to expand - -The dataset consists of trimmed audio samples extracted from 1-2 hour earnings call recordings. Custom vocabulary evaluation uses two keyword categories to assess system performance under different vocabulary conditions: - -**Chunk keywords**: include only the keywords that actually appear in the trimmed audio chunk being evaluated. This represents a focused vocabulary set that matches the content of the specific audio sample. - -**File keywords**: include all keywords from the entire source file (1-2 hours) that the audio chunk was trimmed from. This means the system is provided with a comprehensive vocabulary list that may contain keywords not present in the actual audio chunk being transcribed. This tests a system's ability to handle extensive custom vocabularies without introducing false positives from irrelevant keywords. - -
- -
- ## Word Error Rate (WER)
@@ -559,10 +537,10 @@ The dataset consists of trimmed audio samples extracted from 1-2 hour earnings c
-| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords (chunk-level) | 9.32 | 9.74 | 9.68 | 10 | 9.5 | 9.21 | -| earnings22-keywords (file-level) | 9.81 | 9.57 | 9.63 | 10.3 | 10 | 9.63 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | +|----------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| +| earnings22-keywords
(chunk-keywords) | 9.32 | 9.74 | 9.68 | 10 | 9.21 | +| earnings22-keywords
(file-keywords) | 9.81 | 9.57 | 9.63 | 10.3 | 9.63 |

@@ -585,10 +563,10 @@ If the model predicts 20 keywords and 15 of them match the ground truth, precisi -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords (chunk-level) | 0.98 | 0.98 | 0.98 | 0.96 | 0.96 | 0.96 | -| earnings22-keywords (file-level) | 0.94 | 0.9 | 0.94 | 0.95 | 0.87 | 0.87 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| +| earnings22-keywords
(chunk-keywords) |0.98 | 0.98 | 0.98 | 0.96 | 0.96 | +| earnings22-keywords
(file-keywords) |0.94 | 0.9 | 0.94 | 0.95 | 0.87 |

@@ -611,10 +589,10 @@ If the ground-truth transcript has 25 keywords and the model correctly finds 15, -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords (chunk-level) | 0.89 | 0.69 | 0.7 | 0.81 | 0.89 | 0.88 | -| earnings22-keywords (file-level) | 0.83 | 0.79 | 0.68 | 0.76 | 0.86 | 0.85 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | +|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| +| earnings22-keywords
(chunk-keywords) | 0.89 | 0.69 | 0.7 | 0.81 | 0.88 | +| earnings22-keywords
(file-keywords) | 0.83 | 0.79 | 0.68 | 0.76 | 0.85 |

@@ -638,10 +616,9 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa -| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(Parakeet V2) | Argmax
(Parakeet V3) | -|----------------------|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:|:------------------------:| -| earnings22-keywords (chunk-level) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | -| earnings22-keywords (file-level) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | +| Dataset | Deepgram
(nova-3) | OpenAI
(whisper-1) | AssemblyAI | Whisper OSS
(large-v3-turbo) | Argmax
(parakeet-v3) | +|----------------------:|:---------------------:|:----------------------:|:----------:|:----------------------:|:------------------------:| +| earnings22-keywords
(chunk-keywords) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | +| earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 |

- From adc13feb35a7ea0209674c0ee621b79776650ea4 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:46:22 +0300 Subject: [PATCH 06/14] Refactor --- BENCHMARKS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index 1866ec8..de5aedf 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -621,4 +621,4 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa | earnings22-keywords
(chunk-keywords) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | | earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | -

+

\ No newline at end of file From 3bda1d5204a27af0f54695c8002b6376597c15b9 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:46:56 +0300 Subject: [PATCH 07/14] refactor --- BENCHMARKS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index de5aedf..1866ec8 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -621,4 +621,4 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa | earnings22-keywords
(chunk-keywords) | 0.93 | 0.84 | 0.82 | 0.85 | 0.92 | 0.92 | | earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 | -

\ No newline at end of file +

From e9b314cf14508ee98b931c6c48538e2b6d74b9d9 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:47:18 +0300 Subject: [PATCH 08/14] . --- BENCHMARKS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/BENCHMARKS.md b/BENCHMARKS.md index 1866ec8..58e69b0 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -622,3 +622,4 @@ F1 = 2 × (0.75 × 0.6) / (0.75 + 0.6) = **66.7%**, reflecting the model's overa | earnings22-keywords
(file-keywords) | 0.88 | 0.81 | 0.79 | 0.86 | 0.87 | 0.86 |

+ From 1dbc5fe91bf38128428e38a1e8f277a9f79af55b Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Sat, 15 Nov 2025 01:51:06 +0300 Subject: [PATCH 09/14] fix api key --- src/openbench/pipeline/streaming_diarization/deepgram.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openbench/pipeline/streaming_diarization/deepgram.py b/src/openbench/pipeline/streaming_diarization/deepgram.py index 9bc045d..12e41fd 100644 --- a/src/openbench/pipeline/streaming_diarization/deepgram.py +++ b/src/openbench/pipeline/streaming_diarization/deepgram.py @@ -34,7 +34,7 @@ class DeepgramStreamingDiarizationApi: def __init__(self, cfg) -> None: self.realtime_resolution = 0.020 self.model_version = "nova-3" - self.api_key = "38c68bfb62405f1ae17a840777ef531060018c3d" + self.api_key = os.getenv("DEEPGRAM_API_KEY") assert ( self.api_key is not None ), "Please set DEEPGRAM_API_KEY in environment" From 28da8ec581cf00cfbe303541fc292c5cb64ee55a Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Wed, 19 Nov 2025 19:12:08 +0300 Subject: [PATCH 10/14] Add Deepgram streaming orchestration pipeline --- .../DeepgramStreamingDiarizationPipeline.yaml | 8 - ...eepgramStreamingOrchestrationPipeline.yaml | 9 + .../pipeline/orchestration/__init__.py | 6 + .../orchestration_deepgram_streaming.py | 113 +++++++ src/openbench/pipeline/pipeline_aliases.py | 17 + .../streaming_diarization/__init__.py | 13 - .../pipeline/streaming_diarization/common.py | 13 - .../streaming_diarization/deepgram.py | 316 ------------------ .../streaming_transcription/deepgram.py | 155 +++++++-- 9 files changed, 263 insertions(+), 387 deletions(-) delete mode 100644 config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml create mode 100644 config/pipeline_configs/DeepgramStreamingOrchestrationPipeline.yaml create mode 100644 src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py delete mode 100644 src/openbench/pipeline/streaming_diarization/__init__.py delete mode 100644 src/openbench/pipeline/streaming_diarization/common.py delete mode 100644 src/openbench/pipeline/streaming_diarization/deepgram.py diff --git a/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml b/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml deleted file mode 100644 index 3f458e5..0000000 --- a/config/pipeline_configs/DeepgramStreamingDiarizationPipeline.yaml +++ /dev/null @@ -1,8 +0,0 @@ -DeepgramStreamingDiarizationPipeline: - config: - endpoint_url: "wss://api.deepgram.com" - sample_rate: 16000 - channels: 1 - sample_width: 2 - realtime_resolution: 0.020 - model_version: "nova-3" \ No newline at end of file diff --git a/config/pipeline_configs/DeepgramStreamingOrchestrationPipeline.yaml b/config/pipeline_configs/DeepgramStreamingOrchestrationPipeline.yaml new file mode 100644 index 0000000..6d3eae2 --- /dev/null +++ b/config/pipeline_configs/DeepgramStreamingOrchestrationPipeline.yaml @@ -0,0 +1,9 @@ +DeepgramStreamingOrchestrationPipeline: + pipeline_config: + sample_rate: 16000 + channels: 1 + sample_width: 2 + realtime_resolution: 0.020 + model_version: "nova-3" + enable_diarization: true + diff --git a/src/openbench/pipeline/orchestration/__init__.py b/src/openbench/pipeline/orchestration/__init__.py index 5298603..c450180 100644 --- a/src/openbench/pipeline/orchestration/__init__.py +++ b/src/openbench/pipeline/orchestration/__init__.py @@ -2,6 +2,10 @@ # Copyright (C) 2025 Argmax, Inc. All Rights Reserved. from .orchestration_deepgram import DeepgramOrchestrationPipeline, DeepgramOrchestrationPipelineConfig +from .orchestration_deepgram_streaming import ( + DeepgramStreamingOrchestrationPipeline, + DeepgramStreamingOrchestrationPipelineConfig, +) from .orchestration_openai import OpenAIOrchestrationPipeline, OpenAIOrchestrationPipelineConfig from .orchestration_whisperkitpro import WhisperKitProOrchestrationConfig, WhisperKitProOrchestrationPipeline from .whisperx import WhisperXPipeline, WhisperXPipelineConfig @@ -10,6 +14,8 @@ __all__ = [ "DeepgramOrchestrationPipeline", "DeepgramOrchestrationPipelineConfig", + "DeepgramStreamingOrchestrationPipeline", + "DeepgramStreamingOrchestrationPipelineConfig", "WhisperXPipeline", "WhisperXPipelineConfig", "WhisperKitProOrchestrationPipeline", diff --git a/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py new file mode 100644 index 0000000..4ab01d8 --- /dev/null +++ b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py @@ -0,0 +1,113 @@ +# For licensing see accompanying LICENSE.md file. +# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. + +import numpy as np +from pydantic import Field + +from ...dataset import OrchestrationSample +from ...pipeline import Pipeline, PipelineConfig, register_pipeline +from ...pipeline_prediction import Transcript, Word +from ...types import PipelineType +from ..streaming_transcription.deepgram import DeepgramApi +from .common import OrchestrationOutput + + +class DeepgramStreamingOrchestrationPipelineConfig(PipelineConfig): + sample_rate: int = Field( + default=16000, + description="Sample rate of the audio" + ) + channels: int = Field( + default=1, + description="Number of audio channels" + ) + sample_width: int = Field( + default=2, + description="Sample width in bytes" + ) + realtime_resolution: float = Field( + default=0.020, + description="Real-time resolution for streaming" + ) + model_version: str = Field( + default="nova-3", + description=( + "The model to use for real-time transcription " + "with diarization" + ) + ) + enable_diarization: bool = Field( + default=True, + description="Whether to enable speaker diarization" + ) + + +@register_pipeline +class DeepgramStreamingOrchestrationPipeline(Pipeline): + _config_class = DeepgramStreamingOrchestrationPipelineConfig + pipeline_type = PipelineType.ORCHESTRATION + + def build_pipeline(self): + """Build Deepgram streaming API with diarization enabled.""" + # Create a modified config for the streaming API + from types import SimpleNamespace + + api_config = SimpleNamespace( + channels=self.config.channels, + sample_width=self.config.sample_width, + sample_rate=self.config.sample_rate, + realtime_resolution=self.config.realtime_resolution, + model_version=self.config.model_version, + enable_diarization=self.config.enable_diarization, + ) + + pipeline = DeepgramApi(api_config) + return pipeline + + def parse_input(self, input_sample: OrchestrationSample): + """Convert audio waveform to bytes for streaming.""" + y = input_sample.waveform + y_int16 = (y * 32767).astype(np.int16) + audio_data_byte = y_int16.T.tobytes() + return audio_data_byte + + def parse_output(self, output) -> OrchestrationOutput: + """Parse output to extract transcription and diarization.""" + # Extract words with speaker info if diarization enabled + words = [] + + if ( + "words_with_speakers" in output and + output["words_with_speakers"] + ): + # This comes from diarization-enabled streaming + for word_info in output["words_with_speakers"]: + words.append(Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=word_info.get("speaker"), + )) + elif ( + "model_timestamps_confirmed" in output and + output["model_timestamps_confirmed"] + ): + # Fallback to regular transcription without speaker + for timestamp_group in output["model_timestamps_confirmed"]: + for word_info in timestamp_group: + if "word" in word_info: + words.append(Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=None, + )) + + # Create final transcript with speaker-attributed words + transcript = Transcript(words=words) + + return OrchestrationOutput( + prediction=transcript, + transcription_output=None, + diarization_output=None, + ) diff --git a/src/openbench/pipeline/pipeline_aliases.py b/src/openbench/pipeline/pipeline_aliases.py index b02f759..10c22c5 100644 --- a/src/openbench/pipeline/pipeline_aliases.py +++ b/src/openbench/pipeline/pipeline_aliases.py @@ -16,6 +16,7 @@ ) from .orchestration import ( DeepgramOrchestrationPipeline, + DeepgramStreamingOrchestrationPipeline, OpenAIOrchestrationPipeline, WhisperKitProOrchestrationPipeline, WhisperXPipeline, @@ -170,6 +171,22 @@ def register_pipeline_aliases() -> None: description="Deepgram orchestration pipeline. Requires API key from https://www.deepgram.com/. Set `DEEPGRAM_API_KEY` env var.", ) + PipelineRegistry.register_alias( + "deepgram-streaming-orchestration", + DeepgramStreamingOrchestrationPipeline, + default_config={ + "sample_rate": 16000, + "channels": 1, + "sample_width": 2, + "realtime_resolution": 0.020, + "model_version": "nova-3", + "enable_diarization": True, + }, + description=( + "Deepgram streaming orchestration pipeline with diarization enabled." + ), + ) + PipelineRegistry.register_alias( "whisperkitpro-orchestration-tiny", WhisperKitProOrchestrationPipeline, diff --git a/src/openbench/pipeline/streaming_diarization/__init__.py b/src/openbench/pipeline/streaming_diarization/__init__.py deleted file mode 100644 index 670f0b9..0000000 --- a/src/openbench/pipeline/streaming_diarization/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# For licensing see accompanying LICENSE.md file. -# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. - -from .common import StreamingDiarizationConfig, StreamingDiarizationOutput -from .deepgram import DeepgramStreamingDiarizationPipeline, DeepgramStreamingDiarizationPipelineConfig - -__all__ = [ - "StreamingDiarizationConfig", - "StreamingDiarizationOutput", - "DeepgramStreamingDiarizationPipeline", - "DeepgramStreamingDiarizationPipelineConfig", -] - diff --git a/src/openbench/pipeline/streaming_diarization/common.py b/src/openbench/pipeline/streaming_diarization/common.py deleted file mode 100644 index 2704d9c..0000000 --- a/src/openbench/pipeline/streaming_diarization/common.py +++ /dev/null @@ -1,13 +0,0 @@ -# For licensing see accompanying LICENSE.md file. -# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. - -from ...pipeline_prediction import Transcript -from ..base import PipelineConfig, PipelineOutput - - -class StreamingDiarizationConfig(PipelineConfig): - endpoint_url: str - - -class StreamingDiarizationOutput(PipelineOutput[Transcript]): ... - diff --git a/src/openbench/pipeline/streaming_diarization/deepgram.py b/src/openbench/pipeline/streaming_diarization/deepgram.py deleted file mode 100644 index 12e41fd..0000000 --- a/src/openbench/pipeline/streaming_diarization/deepgram.py +++ /dev/null @@ -1,316 +0,0 @@ -# For licensing see accompanying LICENSE.md file. -# Copyright (C) 2025 Argmax, Inc. All Rights Reserved. - -import asyncio -import json -import os - -import numpy as np -import websockets -from argmaxtools.utils import get_logger -from pyannote.core import Segment -from pydantic import Field - -from openbench.dataset import OrchestrationSample - -from ...pipeline import Pipeline, register_pipeline -from ...pipeline_prediction import ( - DiarizationAnnotation, - StreamingDiarization, - Transcript, - Word, -) -from ...types import PipelineType -from .common import StreamingDiarizationConfig, StreamingDiarizationOutput - - -logger = get_logger(__name__) - -# Some parts of this code are adapted from the Deepgram streaming example at: -# https://developers.deepgram.com/docs/measuring-streaming-latency - - -class DeepgramStreamingDiarizationApi: - def __init__(self, cfg) -> None: - self.realtime_resolution = 0.020 - self.model_version = "nova-3" - self.api_key = os.getenv("DEEPGRAM_API_KEY") - assert ( - self.api_key is not None - ), "Please set DEEPGRAM_API_KEY in environment" - self.channels = cfg.channels - self.sample_width = cfg.sample_width - self.sample_rate = cfg.sample_rate - self.host_url = os.getenv( - "DEEPGRAM_HOST_URL", "wss://api.deepgram.com" - ) - - async def run(self, data, key, channels, sample_width, sample_rate): - """Connect to Deepgram real-time endpoint with diarization. - - This streams audio data in real-time and collects diarization results. - """ - # How many bytes are contained in one second of audio. - byte_rate = sample_width * sample_rate * channels - - # Variables for collecting results - audio_cursor = 0.0 - audio_cursor_l = [] - interim_annotations_l = [] - confirmed_audio_cursor_l = [] - confirmed_interim_annotations_l = [] - final_annotation = DiarizationAnnotation() - transcript_text = "" - words_with_speakers = [] - - # Connect to the real-time streaming endpoint with diarization - url = ( - f"{self.host_url}/v1/listen?" - f"model={self.model_version}&" - f"channels={channels}&" - f"sample_rate={sample_rate}&" - f"encoding=linear16&" - f"interim_results=true&" - f"diarize=true" - ) - async with websockets.connect( - url, - additional_headers={ - "Authorization": "Token {}".format(key), - }, - ) as ws: - - async def sender(ws): - """Sends the data, mimicking real-time connection.""" - nonlocal data, audio_cursor - try: - while len(data): - # Bytes in `REALTIME_RESOLUTION` seconds - i = int(byte_rate * self.realtime_resolution) - chunk, data = data[:i], data[i:] - # Send the data - await ws.send(chunk) - # Move the audio cursor - audio_cursor += self.realtime_resolution - # Mimic real-time by waiting - await asyncio.sleep(self.realtime_resolution) - - # A CloseStream message tells Deepgram that no more audio - # will be sent. Deepgram will close the connection once all - # audio has finished processing. - await ws.send(json.dumps({"type": "CloseStream"})) - except Exception as e: - logger.error(f"Error while sending: {e}") - raise - - async def receiver(ws): - """Collect diarization results from the server.""" - nonlocal audio_cursor - nonlocal interim_annotations_l - nonlocal audio_cursor_l - nonlocal confirmed_interim_annotations_l - nonlocal confirmed_audio_cursor_l - nonlocal final_annotation - nonlocal transcript_text - nonlocal words_with_speakers - - async for msg in ws: - msg = json.loads(msg) - - if "request_id" in msg: - # This is the final metadata message - continue - - # Process words with speaker information - if "channel" in msg and "alternatives" in msg["channel"]: - alternatives = msg["channel"]["alternatives"] - if ( - len(alternatives) > 0 - and "words" in alternatives[0] - ): - words = alternatives[0]["words"] - - # Create annotation from words - annotation = DiarizationAnnotation() - for word_info in words: - if ( - "speaker" in word_info - and "start" in word_info - and "end" in word_info - ): - speaker = word_info["speaker"] - start = word_info["start"] - end = word_info["end"] - segment = Segment(start, end) - annotation[segment] = ( - f"SPEAKER_{speaker}" - ) - - if len(annotation) > 0: - if not msg.get("is_final", False): - # Interim result - audio_cursor_l.append(audio_cursor) - interim_annotations_l.append( - annotation - ) - logger.debug( - f"Interim annotation with " - f"{len(annotation)} segments" - ) - else: - # Confirmed/final result - confirmed_audio_cursor_l.append( - audio_cursor - ) - confirmed_interim_annotations_l.append( - annotation - ) - - # Merge into final annotation - for ( - segment, - _, - speaker, - ) in annotation.itertracks( - yield_label=True - ): - final_annotation[segment] = speaker - - # Collect final transcript and words - for word_info in words: - if ( - "word" in word_info - and "speaker" in word_info - ): - speaker_label = ( - f"SPEAKER_" - f"{word_info['speaker']}" - ) - words_with_speakers.append({ - "word": word_info["word"], - "speaker": speaker_label, - "start": ( - word_info.get("start", 0) - ), - "end": ( - word_info.get("end", 0) - ), - }) - - # Build full transcript with tags - if words_with_speakers: - current_speaker = None - transcript_parts = [] - for w in words_with_speakers: - spk = w["speaker"] - if spk != current_speaker: - if ( - current_speaker - is not None - ): - transcript_parts.append( - "" - ) - transcript_parts.append( - f"[{spk}]" - ) - current_speaker = spk - transcript_parts.append( - w["word"] - ) - transcript_text = " ".join( - transcript_parts - ) - - logger.debug( - f"Confirmed annotation with " - f"{len(annotation)} segments" - ) - - await asyncio.wait([ - asyncio.ensure_future(sender(ws)), - asyncio.ensure_future(receiver(ws)) - ]) - - return ( - final_annotation, - interim_annotations_l, - audio_cursor_l, - confirmed_interim_annotations_l, - confirmed_audio_cursor_l, - transcript_text, - words_with_speakers, - ) - - def __call__(self, sample): - # Sample must be in bytes - ( - final_annotation, - interim_annotations, - audio_cursor_l, - confirmed_interim_annotations, - confirmed_audio_cursor_l, - transcript_text, - words_with_speakers, - ) = asyncio.get_event_loop().run_until_complete( - self.run( - sample, - self.api_key, - self.channels, - self.sample_width, - self.sample_rate, - ) - ) - - return { - "annotation": final_annotation, - "interim_annotations": interim_annotations, - "audio_cursor": audio_cursor_l, - "confirmed_interim_annotations": confirmed_interim_annotations, - "confirmed_audio_cursor": confirmed_audio_cursor_l, - "transcript_text": transcript_text, - "words": words_with_speakers, - } - - -class DeepgramStreamingDiarizationPipelineConfig(StreamingDiarizationConfig): - sample_rate: int - channels: int - sample_width: int - realtime_resolution: float - model_version: str = Field( - default="nova-3", - description="The model to use for real-time diarization" - ) - - -@register_pipeline -class DeepgramStreamingDiarizationPipeline(Pipeline): - _config_class = DeepgramStreamingDiarizationPipelineConfig - pipeline_type = PipelineType.STREAMING_DIARIZATION - - def parse_input(self, input_sample: OrchestrationSample): - y = input_sample.waveform - y_int16 = (y * 32767).astype(np.int16) - audio_data_byte = y_int16.T.tobytes() - return audio_data_byte - - def parse_output(self, output) -> StreamingDiarizationOutput: - # Create Transcript from words with speakers - # For cpWER/WDER, we return transcript as the main prediction - words = [ - Word( - word=w["word"], - start=w.get("start"), - end=w.get("end"), - speaker=w.get("speaker"), - ) - for w in output["words"] - ] - transcript = Transcript(words=words) - - return StreamingDiarizationOutput(prediction=transcript) - - def build_pipeline(self): - pipeline = DeepgramStreamingDiarizationApi(self.config) - return pipeline diff --git a/src/openbench/pipeline/streaming_transcription/deepgram.py b/src/openbench/pipeline/streaming_transcription/deepgram.py index 56bc618..ff6e9fd 100644 --- a/src/openbench/pipeline/streaming_transcription/deepgram.py +++ b/src/openbench/pipeline/streaming_transcription/deepgram.py @@ -26,18 +26,28 @@ class DeepgramApi: def __init__(self, cfg) -> None: - self.realtime_resolution = 0.020 - self.model_version = "nova-3" + self.realtime_resolution = getattr( + cfg, 'realtime_resolution', 0.020 + ) + self.model_version = getattr(cfg, 'model_version', "nova-3") self.api_key = os.getenv("DEEPGRAM_API_KEY") - assert self.api_key is not None, "Please set API key in environment" + assert ( + self.api_key is not None + ), "Please set API key in environment" self.channels = cfg.channels self.sample_width = cfg.sample_width self.sample_rate = cfg.sample_rate - self.host_url = os.getenv("DEEPGRAM_HOST_URL", "wss://api.deepgram.com") + self.host_url = os.getenv( + "DEEPGRAM_HOST_URL", "wss://api.deepgram.com" + ) + self.enable_diarization = getattr( + cfg, 'enable_diarization', False + ) async def run(self, data, key, channels, sample_width, sample_rate): - """Connect to the Deepgram real-time streaming endpoint, stream the data - in real-time, and print out the responses from the server. + """Connect to Deepgram real-time streaming endpoint. + + Stream the data in real-time and print responses from server. This uses a pre-recorded file as an example. It mimics a real-time connection by sending `REALTIME_RESOLUTION` seconds of audio every @@ -62,9 +72,23 @@ async def run(self, data, key, channels, sample_width, sample_rate): confirmed_interim_transcripts = [] model_timestamps_hypothesis = [] model_timestamps_confirmed = [] - # Connect to the real-time streaming endpoint, attaching our API key. + words_with_speakers = [] + + # Build connection URL with optional diarization + url = ( + f"{self.host_url}/v1/listen?" + f"model={self.model_version}&" + f"channels={channels}&" + f"sample_rate={sample_rate}&" + f"encoding=linear16&" + f"interim_results=true" + ) + if self.enable_diarization: + url += "&diarize=true" + + # Connect to the real-time streaming endpoint async with websockets.connect( - f"{self.host_url}/v1/listen?model={self.model_version}&channels={channels}&sample_rate={sample_rate}&encoding=linear16&interim_results=true", + url, additional_headers={ "Authorization": "Token {}".format(key), }, @@ -75,28 +99,27 @@ async def sender(ws): nonlocal data, audio_cursor try: while len(data): - # How many bytes are in `REALTIME_RESOLUTION` seconds of audio + # How many bytes in `REALTIME_RESOLUTION` seconds i = int(byte_rate * self.realtime_resolution) chunk, data = data[:i], data[i:] # Send the data await ws.send(chunk) # Move the audio cursor audio_cursor += self.realtime_resolution - # Mimic real-time by waiting `REALTIME_RESOLUTION` seconds - # before the next packet. + # Mimic real-time by waiting await asyncio.sleep(self.realtime_resolution) - # A CloseStream message tells Deepgram that no more audio - # will be sent. Deepgram will close the connection once all - # audio has finished processing. + # A CloseStream message tells Deepgram that no more + # audio will be sent. Deepgram will close connection + # once all audio has finished processing. await ws.send(json.dumps({"type": "CloseStream"})) except Exception as e: print(f"Error while sending: {e}") raise async def receiver(ws): - """Print out the messages received from the server.""" - nonlocal audio_cursor + """Print out messages received from the server.""" + nonlocal audio_cursor, words_with_speakers global transcript global interim_transcripts global audio_cursor_l @@ -109,28 +132,58 @@ async def receiver(ws): async for msg in ws: msg = json.loads(msg) if "request_id" in msg: - # This is the final metadata message. It gets sent as the - # very last message by Deepgram during a clean shutdown. + # This is the final metadata message. # There is no transcript in it. continue - if msg["channel"]["alternatives"][0]["transcript"] != "": + alternatives = msg["channel"]["alternatives"][0] + if alternatives["transcript"] != "": if not msg["is_final"]: audio_cursor_l.append(audio_cursor) - model_timestamps_hypothesis.append(msg["channel"]["alternatives"][0]["words"]) + model_timestamps_hypothesis.append( + alternatives["words"] + ) interim_transcripts.append( - transcript + " " + msg["channel"]["alternatives"][0]["transcript"] + transcript + " " + alternatives["transcript"] ) logger.debug( - "\n" + "Transcription: " + transcript + msg["channel"]["alternatives"][0]["transcript"] + "\n" + "Transcription: " + transcript + + alternatives["transcript"] ) elif msg["is_final"]: confirmed_audio_cursor_l.append(audio_cursor) - transcript = transcript + " " + msg["channel"]["alternatives"][0]["transcript"] - confirmed_interim_transcripts.append(transcript) - model_timestamps_confirmed.append(msg["channel"]["alternatives"][0]["words"]) + transcript = ( + transcript + " " + alternatives["transcript"] + ) + confirmed_interim_transcripts.append( + transcript + ) + words = alternatives["words"] + model_timestamps_confirmed.append(words) + + # Collect speaker info if diarization enabled + if self.enable_diarization: + for word_info in words: + if "speaker" in word_info: + speaker_label = ( + f"SPEAKER_" + f"{word_info['speaker']}" + ) + words_with_speakers.append({ + "word": word_info.get( + "word", "" + ), + "speaker": speaker_label, + "start": word_info.get( + "start", 0 + ), + "end": word_info.get("end", 0), + }) - await asyncio.wait([asyncio.ensure_future(sender(ws)), asyncio.ensure_future(receiver(ws))]) + await asyncio.wait([ + asyncio.ensure_future(sender(ws)), + asyncio.ensure_future(receiver(ws)) + ]) return ( transcript, interim_transcripts, @@ -139,6 +192,7 @@ async def receiver(ws): confirmed_audio_cursor_l, model_timestamps_hypothesis, model_timestamps_confirmed, + words_with_speakers, ) def __call__(self, sample): @@ -151,17 +205,28 @@ def __call__(self, sample): confirmed_audio_cursor_l, model_timestamps_hypothesis, model_timestamps_confirmed, + words_with_speakers, ) = asyncio.get_event_loop().run_until_complete( - self.run(sample, self.api_key, self.channels, self.sample_width, self.sample_rate) + self.run( + sample, self.api_key, self.channels, + self.sample_width, self.sample_rate + ) ) return { "transcript": transcript, "interim_transcripts": interim_transcripts, "audio_cursor": audio_cursor_l, - "confirmed_interim_transcripts": confirmed_interim_transcripts, + "confirmed_interim_transcripts": ( + confirmed_interim_transcripts + ), "confirmed_audio_cursor": confirmed_audio_cursor_l, - "model_timestamps_hypothesis": model_timestamps_hypothesis, - "model_timestamps_confirmed": model_timestamps_confirmed, + "model_timestamps_hypothesis": ( + model_timestamps_hypothesis + ), + "model_timestamps_confirmed": ( + model_timestamps_confirmed + ), + "words_with_speakers": words_with_speakers, } @@ -170,7 +235,9 @@ class DeepgramStreamingPipelineConfig(StreamingTranscriptionConfig): channels: int sample_width: int realtime_resolution: float - model_version: str = Field(..., description="The model to use for real-time transcription") + model_version: str = Field( + ..., description="The model to use for real-time transcription" + ) @register_pipeline @@ -184,19 +251,31 @@ def parse_input(self, input_sample: StreamingSample): audio_data_byte = y_int16.T.tobytes() return audio_data_byte - def parse_output(self, output) -> StreamingTranscriptionOutput: - model_timestamps_hypothesis = output["model_timestamps_hypothesis"] - model_timestamps_confirmed = output["model_timestamps_confirmed"] + def parse_output( + self, output + ) -> StreamingTranscriptionOutput: + model_timestamps_hypothesis = ( + output["model_timestamps_hypothesis"] + ) + model_timestamps_confirmed = ( + output["model_timestamps_confirmed"] + ) if model_timestamps_hypothesis is not None: model_timestamps_hypothesis = [ - [{"start": word["start"], "end": word["end"]} for word in interim_result_words] + [ + {"start": word["start"], "end": word["end"]} + for word in interim_result_words + ] for interim_result_words in model_timestamps_hypothesis ] if model_timestamps_confirmed is not None: model_timestamps_confirmed = [ - [{"start": word["start"], "end": word["end"]} for word in interim_result_words] + [ + {"start": word["start"], "end": word["end"]} + for word in interim_result_words + ] for interim_result_words in model_timestamps_confirmed ] @@ -205,7 +284,9 @@ def parse_output(self, output) -> StreamingTranscriptionOutput: audio_cursor=output["audio_cursor"], interim_results=output["interim_transcripts"], confirmed_audio_cursor=output["confirmed_audio_cursor"], - confirmed_interim_results=output["confirmed_interim_transcripts"], + confirmed_interim_results=( + output["confirmed_interim_transcripts"] + ), model_timestamps_hypothesis=model_timestamps_hypothesis, model_timestamps_confirmed=model_timestamps_confirmed, ) From 6c6a394d1fe266d147eaebcab6fcd29173cafeff Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Wed, 19 Nov 2025 19:40:56 +0300 Subject: [PATCH 11/14] remove streaming diarization --- src/openbench/dataset/dataset_aliases.py | 1 - src/openbench/dataset/dataset_registry.py | 7 +-- src/openbench/metric/registry.py | 6 +- .../word_error_metrics/word_error_metrics.py | 5 +- src/openbench/pipeline/__init__.py | 1 - src/openbench/pipeline_prediction.py | 56 ------------------- src/openbench/runner/benchmark.py | 2 - src/openbench/types.py | 1 - 8 files changed, 3 insertions(+), 76 deletions(-) diff --git a/src/openbench/dataset/dataset_aliases.py b/src/openbench/dataset/dataset_aliases.py index 99bab3a..14d7ac9 100644 --- a/src/openbench/dataset/dataset_aliases.py +++ b/src/openbench/dataset/dataset_aliases.py @@ -172,7 +172,6 @@ def register_dataset_aliases() -> None: supported_pipeline_types={ PipelineType.TRANSCRIPTION, PipelineType.ORCHESTRATION, - PipelineType.STREAMING_DIARIZATION, }, description=( "Callhome English dataset for transcription and orchestration evaluation. " diff --git a/src/openbench/dataset/dataset_registry.py b/src/openbench/dataset/dataset_registry.py index 1fdfef7..f73ae17 100644 --- a/src/openbench/dataset/dataset_registry.py +++ b/src/openbench/dataset/dataset_registry.py @@ -137,10 +137,5 @@ def has_alias(cls, alias: str) -> bool: # Register all datasets DatasetRegistry.register(PipelineType.DIARIZATION, DiarizationDataset) DatasetRegistry.register(PipelineType.ORCHESTRATION, OrchestrationDataset) -DatasetRegistry.register( - PipelineType.STREAMING_TRANSCRIPTION, StreamingDataset -) -DatasetRegistry.register( - PipelineType.STREAMING_DIARIZATION, OrchestrationDataset -) +DatasetRegistry.register(PipelineType.STREAMING_TRANSCRIPTION, StreamingDataset) DatasetRegistry.register(PipelineType.TRANSCRIPTION, TranscriptionDataset) diff --git a/src/openbench/metric/registry.py b/src/openbench/metric/registry.py index d8eec6b..99e991b 100644 --- a/src/openbench/metric/registry.py +++ b/src/openbench/metric/registry.py @@ -135,11 +135,7 @@ def get_available_metrics(cls, pipeline_type: PipelineType) -> list[MetricOption # Register all existing and interesting metrics from pyannote.metrics # Custom metrics will be registered in their own files -MetricRegistry.register( - DiarizationErrorRate, - (PipelineType.DIARIZATION, PipelineType.STREAMING_DIARIZATION), - MetricOptions.DER, -) +MetricRegistry.register(DiarizationErrorRate, PipelineType.DIARIZATION, MetricOptions.DER) MetricRegistry.register(JaccardErrorRate, PipelineType.DIARIZATION, MetricOptions.JER) MetricRegistry.register(DiarizationPurity, PipelineType.DIARIZATION, MetricOptions.DIARIZATION_PURITY) MetricRegistry.register( diff --git a/src/openbench/metric/word_error_metrics/word_error_metrics.py b/src/openbench/metric/word_error_metrics/word_error_metrics.py index c3719ae..5b45d75 100644 --- a/src/openbench/metric/word_error_metrics/word_error_metrics.py +++ b/src/openbench/metric/word_error_metrics/word_error_metrics.py @@ -90,9 +90,7 @@ def _get_word_error_metrics( return result.ops[0], (ref_words, hyp_words), (ref_speakers, hyp_speakers) -@MetricRegistry.register_metric( - (PipelineType.ORCHESTRATION, PipelineType.STREAMING_DIARIZATION), MetricOptions.WDER -) +@MetricRegistry.register_metric(PipelineType.ORCHESTRATION, MetricOptions.WDER) class WordDiarizationErrorRate(BaseWordErrorMetric): """Word Diarization Error Rate (WDER) implementation. @@ -224,7 +222,6 @@ def compute_metric(self, detail: Details) -> float: PipelineType.TRANSCRIPTION, PipelineType.ORCHESTRATION, PipelineType.STREAMING_TRANSCRIPTION, - PipelineType.STREAMING_DIARIZATION, ), MetricOptions.WER, ) diff --git a/src/openbench/pipeline/__init__.py b/src/openbench/pipeline/__init__.py index bc405bd..598328b 100644 --- a/src/openbench/pipeline/__init__.py +++ b/src/openbench/pipeline/__init__.py @@ -6,7 +6,6 @@ from .diarization import * from .orchestration import * from .pipeline_registry import PipelineRegistry -from .streaming_diarization import * from .streaming_transcription import * from .transcription import * diff --git a/src/openbench/pipeline_prediction.py b/src/openbench/pipeline_prediction.py index fc9ddb0..030d6d5 100644 --- a/src/openbench/pipeline_prediction.py +++ b/src/openbench/pipeline_prediction.py @@ -274,59 +274,3 @@ def to_annotation_file(self, output_dir: str, filename: str) -> str: json.dump(data, f, indent=2) return path - - -# Streaming Diarization Prediction -class StreamingDiarization(BaseModel): - """Streaming diarization output combining real-time and diarization.""" - - annotation: DiarizationAnnotation = Field( - ..., description="The final diarization annotation" - ) - transcript: "Transcript | None" = Field( - None, description="The transcript with speaker information" - ) - audio_cursor: list[float] | None = Field( - None, description="The audio cursor in seconds for interim results" - ) - interim_annotations: list[DiarizationAnnotation] | None = Field( - None, description="Interim diarization annotations" - ) - confirmed_audio_cursor: list[float] | None = Field( - None, description="The confirmed audio cursor in seconds" - ) - confirmed_interim_annotations: list[DiarizationAnnotation] | None = Field( - None, description="The confirmed interim diarization annotations" - ) - - class Config: - arbitrary_types_allowed = True - - def to_annotation_file(self, output_dir: str, filename: str) -> str: - """Save both the final annotation and streaming metadata.""" - # Save final annotation as RTTM - rttm_path = os.path.join(output_dir, f"{filename}.rttm") - with open(rttm_path, "w") as f: - self.annotation.write_rttm(f) - - # Save streaming metadata as JSON - json_path = os.path.join(output_dir, f"{filename}_streaming.json") - data = { - "audio_cursor": self.audio_cursor, - "confirmed_audio_cursor": self.confirmed_audio_cursor, - "num_interim_annotations": ( - len(self.interim_annotations) - if self.interim_annotations - else 0 - ), - "num_confirmed_interim_annotations": ( - len(self.confirmed_interim_annotations) - if self.confirmed_interim_annotations - else 0 - ), - } - - with open(json_path, "w") as f: - json.dump(data, f, indent=2) - - return rttm_path diff --git a/src/openbench/runner/benchmark.py b/src/openbench/runner/benchmark.py index c2d0f3f..b923fa3 100644 --- a/src/openbench/runner/benchmark.py +++ b/src/openbench/runner/benchmark.py @@ -33,7 +33,6 @@ PipelineType.TRANSCRIPTION: TranscriptionSampleResult, PipelineType.ORCHESTRATION: TranscriptionSampleResult, PipelineType.STREAMING_TRANSCRIPTION: TranscriptionSampleResult, - PipelineType.STREAMING_DIARIZATION: TranscriptionSampleResult, } @@ -65,7 +64,6 @@ def __init__(self, config: BenchmarkConfig, pipelines: list[Pipeline]): PipelineType.TRANSCRIPTION: TranscriptionWandbLogger, PipelineType.ORCHESTRATION: TranscriptionWandbLogger, PipelineType.STREAMING_TRANSCRIPTION: TranscriptionWandbLogger, - PipelineType.STREAMING_DIARIZATION: TranscriptionWandbLogger, } def _get_metrics(self, pipeline: Pipeline) -> dict[str, BaseMetric]: diff --git a/src/openbench/types.py b/src/openbench/types.py index 8fa1398..b4bbeaa 100644 --- a/src/openbench/types.py +++ b/src/openbench/types.py @@ -12,7 +12,6 @@ class PipelineType(Enum): TRANSCRIPTION = "transcription" ORCHESTRATION = "orchestration" STREAMING_TRANSCRIPTION = "streaming_transcription" - STREAMING_DIARIZATION = "streaming_diarization" # All prediction classes that we output should conform to this From d350e0d468c5e19ac1c5f1f741e03346ff95242d Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Wed, 17 Dec 2025 20:19:32 +0300 Subject: [PATCH 12/14] Fix merge conflict --- .../orchestration_deepgram_streaming.py | 69 ++++------- src/openbench/pipeline/pipeline_aliases.py | 4 +- .../streaming_transcription/deepgram.py | 107 +++++------------- src/openbench/runner/benchmark.py | 8 +- 4 files changed, 56 insertions(+), 132 deletions(-) diff --git a/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py index 4ab01d8..ce0240f 100644 --- a/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py +++ b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py @@ -13,33 +13,14 @@ class DeepgramStreamingOrchestrationPipelineConfig(PipelineConfig): - sample_rate: int = Field( - default=16000, - description="Sample rate of the audio" - ) - channels: int = Field( - default=1, - description="Number of audio channels" - ) - sample_width: int = Field( - default=2, - description="Sample width in bytes" - ) - realtime_resolution: float = Field( - default=0.020, - description="Real-time resolution for streaming" - ) + sample_rate: int = Field(default=16000, description="Sample rate of the audio") + channels: int = Field(default=1, description="Number of audio channels") + sample_width: int = Field(default=2, description="Sample width in bytes") + realtime_resolution: float = Field(default=0.020, description="Real-time resolution for streaming") model_version: str = Field( - default="nova-3", - description=( - "The model to use for real-time transcription " - "with diarization" - ) - ) - enable_diarization: bool = Field( - default=True, - description="Whether to enable speaker diarization" + default="nova-3", description=("The model to use for real-time transcription with diarization") ) + enable_diarization: bool = Field(default=True, description="Whether to enable speaker diarization") @register_pipeline @@ -76,32 +57,30 @@ def parse_output(self, output) -> OrchestrationOutput: # Extract words with speaker info if diarization enabled words = [] - if ( - "words_with_speakers" in output and - output["words_with_speakers"] - ): + if "words_with_speakers" in output and output["words_with_speakers"]: # This comes from diarization-enabled streaming for word_info in output["words_with_speakers"]: - words.append(Word( - word=word_info.get("word", ""), - start=word_info.get("start"), - end=word_info.get("end"), - speaker=word_info.get("speaker"), - )) - elif ( - "model_timestamps_confirmed" in output and - output["model_timestamps_confirmed"] - ): + words.append( + Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=word_info.get("speaker"), + ) + ) + elif "model_timestamps_confirmed" in output and output["model_timestamps_confirmed"]: # Fallback to regular transcription without speaker for timestamp_group in output["model_timestamps_confirmed"]: for word_info in timestamp_group: if "word" in word_info: - words.append(Word( - word=word_info.get("word", ""), - start=word_info.get("start"), - end=word_info.get("end"), - speaker=None, - )) + words.append( + Word( + word=word_info.get("word", ""), + start=word_info.get("start"), + end=word_info.get("end"), + speaker=None, + ) + ) # Create final transcript with speaker-attributed words transcript = Transcript(words=words) diff --git a/src/openbench/pipeline/pipeline_aliases.py b/src/openbench/pipeline/pipeline_aliases.py index 2379a88..02f6ed9 100644 --- a/src/openbench/pipeline/pipeline_aliases.py +++ b/src/openbench/pipeline/pipeline_aliases.py @@ -183,9 +183,7 @@ def register_pipeline_aliases() -> None: "model_version": "nova-3", "enable_diarization": True, }, - description=( - "Deepgram streaming orchestration pipeline with diarization enabled." - ), + description=("Deepgram streaming orchestration pipeline with diarization enabled."), ) PipelineRegistry.register_alias( diff --git a/src/openbench/pipeline/streaming_transcription/deepgram.py b/src/openbench/pipeline/streaming_transcription/deepgram.py index 64e69fd..ed5c42d 100644 --- a/src/openbench/pipeline/streaming_transcription/deepgram.py +++ b/src/openbench/pipeline/streaming_transcription/deepgram.py @@ -26,23 +26,15 @@ class DeepgramApi: def __init__(self, cfg) -> None: - self.realtime_resolution = getattr( - cfg, 'realtime_resolution', 0.020 - ) - self.model_version = getattr(cfg, 'model_version', "nova-3") + self.realtime_resolution = getattr(cfg, "realtime_resolution", 0.020) + self.model_version = getattr(cfg, "model_version", "nova-3") self.api_key = os.getenv("DEEPGRAM_API_KEY") - assert ( - self.api_key is not None - ), "Please set API key in environment" + assert self.api_key is not None, "Please set API key in environment" self.channels = cfg.channels self.sample_width = cfg.sample_width self.sample_rate = cfg.sample_rate - self.host_url = os.getenv( - "DEEPGRAM_HOST_URL", "wss://api.deepgram.com" - ) - self.enable_diarization = getattr( - cfg, 'enable_diarization', False - ) + self.host_url = os.getenv("DEEPGRAM_HOST_URL", "wss://api.deepgram.com") + self.enable_diarization = getattr(cfg, "enable_diarization", False) async def run(self, data, key, channels, sample_width, sample_rate): """Connect to Deepgram real-time streaming endpoint. @@ -136,25 +128,14 @@ async def receiver(ws): if alternatives["transcript"] != "": if not msg["is_final"]: audio_cursor_l.append(audio_cursor) - model_timestamps_hypothesis.append( - alternatives["words"] - ) - interim_transcripts.append( - transcript + " " + alternatives["transcript"] - ) - logger.debug( - "\n" + "Transcription: " + transcript + - alternatives["transcript"] - ) + model_timestamps_hypothesis.append(alternatives["words"]) + interim_transcripts.append(transcript + " " + alternatives["transcript"]) + logger.debug("\n" + "Transcription: " + transcript + alternatives["transcript"]) elif msg["is_final"]: confirmed_audio_cursor_l.append(audio_cursor) - transcript = ( - transcript + " " + alternatives["transcript"] - ) - confirmed_interim_transcripts.append( - transcript - ) + transcript = transcript + " " + alternatives["transcript"] + confirmed_interim_transcripts.append(transcript) words = alternatives["words"] model_timestamps_confirmed.append(words) @@ -162,20 +143,15 @@ async def receiver(ws): if self.enable_diarization: for word_info in words: if "speaker" in word_info: - speaker_label = ( - f"SPEAKER_" - f"{word_info['speaker']}" + speaker_label = f"SPEAKER_{word_info['speaker']}" + words_with_speakers.append( + { + "word": word_info.get("word", ""), + "speaker": speaker_label, + "start": word_info.get("start", 0), + "end": word_info.get("end", 0), + } ) - words_with_speakers.append({ - "word": word_info.get( - "word", "" - ), - "speaker": speaker_label, - "start": word_info.get( - "start", 0 - ), - "end": word_info.get("end", 0), - }) await asyncio.gather(sender(ws), receiver(ws)) return ( @@ -201,25 +177,16 @@ def __call__(self, sample): model_timestamps_confirmed, words_with_speakers, ) = asyncio.get_event_loop().run_until_complete( - self.run( - sample, self.api_key, self.channels, - self.sample_width, self.sample_rate - ) + self.run(sample, self.api_key, self.channels, self.sample_width, self.sample_rate) ) return { "transcript": transcript, "interim_transcripts": interim_transcripts, "audio_cursor": audio_cursor_l, - "confirmed_interim_transcripts": ( - confirmed_interim_transcripts - ), + "confirmed_interim_transcripts": (confirmed_interim_transcripts), "confirmed_audio_cursor": confirmed_audio_cursor_l, - "model_timestamps_hypothesis": ( - model_timestamps_hypothesis - ), - "model_timestamps_confirmed": ( - model_timestamps_confirmed - ), + "model_timestamps_hypothesis": (model_timestamps_hypothesis), + "model_timestamps_confirmed": (model_timestamps_confirmed), "words_with_speakers": words_with_speakers, } @@ -229,9 +196,7 @@ class DeepgramStreamingPipelineConfig(StreamingTranscriptionConfig): channels: int sample_width: int realtime_resolution: float - model_version: str = Field( - ..., description="The model to use for real-time transcription" - ) + model_version: str = Field(..., description="The model to use for real-time transcription") @register_pipeline @@ -245,31 +210,19 @@ def parse_input(self, input_sample: StreamingSample): audio_data_byte = y_int16.T.tobytes() return audio_data_byte - def parse_output( - self, output - ) -> StreamingTranscriptionOutput: - model_timestamps_hypothesis = ( - output["model_timestamps_hypothesis"] - ) - model_timestamps_confirmed = ( - output["model_timestamps_confirmed"] - ) + def parse_output(self, output) -> StreamingTranscriptionOutput: + model_timestamps_hypothesis = output["model_timestamps_hypothesis"] + model_timestamps_confirmed = output["model_timestamps_confirmed"] if model_timestamps_hypothesis is not None: model_timestamps_hypothesis = [ - [ - {"start": word["start"], "end": word["end"]} - for word in interim_result_words - ] + [{"start": word["start"], "end": word["end"]} for word in interim_result_words] for interim_result_words in model_timestamps_hypothesis ] if model_timestamps_confirmed is not None: model_timestamps_confirmed = [ - [ - {"start": word["start"], "end": word["end"]} - for word in interim_result_words - ] + [{"start": word["start"], "end": word["end"]} for word in interim_result_words] for interim_result_words in model_timestamps_confirmed ] @@ -278,9 +231,7 @@ def parse_output( audio_cursor=output["audio_cursor"], interim_results=output["interim_transcripts"], confirmed_audio_cursor=output["confirmed_audio_cursor"], - confirmed_interim_results=( - output["confirmed_interim_transcripts"] - ), + confirmed_interim_results=(output["confirmed_interim_transcripts"]), model_timestamps_hypothesis=model_timestamps_hypothesis, model_timestamps_confirmed=model_timestamps_confirmed, ) diff --git a/src/openbench/runner/benchmark.py b/src/openbench/runner/benchmark.py index 9a22f7a..439c15c 100644 --- a/src/openbench/runner/benchmark.py +++ b/src/openbench/runner/benchmark.py @@ -114,12 +114,8 @@ def _process_single_sample( ) if pipeline.pipeline_type == PipelineType.DIARIZATION: - sample_results_attributes["num_speakers_predicted"] = ( - output.prediction.num_speakers - ) - sample_results_attributes["num_speakers_reference"] = ( - sample.reference.num_speakers - ) + sample_results_attributes["num_speakers_predicted"] = output.prediction.num_speakers + sample_results_attributes["num_speakers_reference"] = sample.reference.num_speakers sample_result = sample_result_class(**sample_results_attributes) From 0b5dcfc8eeb5cf4acc8caf2720a70443308839be Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Wed, 17 Dec 2025 20:29:46 +0300 Subject: [PATCH 13/14] fix pipeline type --- src/openbench/metric/word_error_metrics/word_error_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openbench/metric/word_error_metrics/word_error_metrics.py b/src/openbench/metric/word_error_metrics/word_error_metrics.py index e9c0a9e..fc4cadb 100644 --- a/src/openbench/metric/word_error_metrics/word_error_metrics.py +++ b/src/openbench/metric/word_error_metrics/word_error_metrics.py @@ -298,7 +298,7 @@ def compute_metric(self, detail: Details) -> float: @MetricRegistry.register_metric( - (PipelineType.ORCHESTRATION, PipelineType.STREAMING_DIARIZATION), + PipelineType.ORCHESTRATION, MetricOptions.CPWER, ) class ConcatenatedMinimumPermutationWER(BaseWordErrorMetric): From cad24b6fa97360b561c043d5deba46356f31ddd1 Mon Sep 17 00:00:00 2001 From: dberkin1 Date: Wed, 31 Dec 2025 20:07:44 +0300 Subject: [PATCH 14/14] Raise error when speaker diarization data is not available in orchestration pipeline --- .../orchestration_deepgram_streaming.py | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py index ce0240f..1d2be74 100644 --- a/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py +++ b/src/openbench/pipeline/orchestration/orchestration_deepgram_streaming.py @@ -68,19 +68,13 @@ def parse_output(self, output) -> OrchestrationOutput: speaker=word_info.get("speaker"), ) ) - elif "model_timestamps_confirmed" in output and output["model_timestamps_confirmed"]: - # Fallback to regular transcription without speaker - for timestamp_group in output["model_timestamps_confirmed"]: - for word_info in timestamp_group: - if "word" in word_info: - words.append( - Word( - word=word_info.get("word", ""), - start=word_info.get("start"), - end=word_info.get("end"), - speaker=None, - ) - ) + else: + # Speaker labels are required for orchestration pipelines + raise ValueError( + "No speaker diarization data available. " + "Orchestration pipelines require speaker labels. " + "Ensure 'enable_diarization' is set to True in the pipeline config." + ) # Create final transcript with speaker-attributed words transcript = Transcript(words=words)