Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ assembly stream -o text | grep --line-buffered -i alex \
assembly transcribe --sample --llm "summarize" --llm "translate the summary to French"
```

**Score diarization quality across several videos** — pipe a list of URLs into batch mode (`--from-stdin`), transcribe them in parallel with speaker labels, have an LLM judge each transcript, then use `--llm-reduce` to run one prompt over all the results for a single aggregate verdict:

```sh
printf '%s\n' \
https://youtu.be/RC5zRvqnRm8 \
https://youtu.be/u9S41Kplsbs \
https://youtu.be/mP31CdpGzUY \
| assembly transcribe --from-stdin --concurrency 3 --speaker-labels \
--llm 'Judge diarization quality; output JSON {speaker_count, issues, score}' \
--llm-reduce 'Rank these videos worst-to-best and summarize the failure modes'
```

**Talk to a voice agent in your terminal** — full-duplex, around 20 voices:

```sh
Expand Down
10 changes: 9 additions & 1 deletion REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@ each carrying a `"type"` field to dispatch on:
| `assembly agent-cascade --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` |
| `assembly dictate --json` | `utterance` |
| `assembly llm --follow --json` | `answer` |
| `assembly transcribe <batch> --json` | `result` (one per source) |
| `assembly transcribe <batch> --json` | `result` (one per source), then `reduce` if `--llm-reduce` is set |

New event types may be added; existing fields are stable. Consumers should
ignore types they don't recognize.

With `--llm-reduce`, batch mode emits one final
`{"type":"reduce","model","prompts","output"}` record after the per-source
`result` records — the aggregate prompt(s) run once over every result, with the
output printed to stdout (the progress table is routed to stderr so stdout stays
clean for piping). `--llm-reduce` is repeatable, each prompt running on the
previous one's output; for a single source it extends the `--llm` chain over
that transcript.
78 changes: 74 additions & 4 deletions aai_cli/app/transcribe/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,22 @@ def _render_table(items: list[_Item]) -> Table:


@contextmanager
def _progress_table(items: list[_Item], *, json_mode: bool) -> Generator[None]:
def _progress_table(items: list[_Item], *, json_mode: bool, reduce_active: bool) -> Generator[None]:
"""Render the batch as a live-updating table (human mode).

Rich renders nothing while running on a non-interactive console and prints the
final frame once on stop, so piped/agent runs still get the result table. JSON
mode skips Rich entirely — NDJSON per source is the output.
mode skips Rich entirely — NDJSON per source is the output. When a --llm-reduce
step will print the aggregate to stdout, the table goes to stderr so stdout
carries only the reduce result.
"""
if json_mode:
yield
return
console = output.error_console if reduce_active else output.console
with Live(
get_renderable=lambda: _render_table(items),
console=output.console,
console=console,
refresh_per_second=4, # pragma: no mutate (cosmetic refresh cadence)
):
yield
Expand Down Expand Up @@ -295,6 +298,70 @@ def _summarize(items: list[_Item], *, json_mode: bool, quiet: bool) -> None:
output.error_console.print(output.success(f"Transcribed {completed}, skipped {skipped}."))


def _reduce_input(record: dict[str, object]) -> str:
"""A source's contribution to the reduce: its last --llm output, else its text."""
transform = jsonshape.as_mapping(record.get("transform"))
if transform is not None:
steps = jsonshape.mapping_list(transform.get("steps"))
if steps:
return str(steps[-1].get("output", "") or "")
transcript = jsonshape.as_mapping(record.get("transcript"))
if transcript is not None:
return str(transcript.get("text", "") or "")
return ""


def _gather_reduce_inputs(items: list[_Item]) -> str:
"""Concatenate each completed/skipped source's reduce input under a header."""
blocks: list[str] = []
for item in items:
if item.status not in ("completed", "skipped"):
continue
record = resumable_record(sidecar_path(item.source), digest=None)
text = _reduce_input(record) if record is not None else ""
if text:
blocks.append(f"### Source: {item.source}\n{text}")
return "\n\n".join(blocks)


def _run_reduce(
api_key: str,
items: list[_Item],
*,
transform: transcribe_exec.TransformOptions,
json_mode: bool,
) -> None:
"""Run the --llm-reduce chain once over every source's result; print to stdout."""
combined = _gather_reduce_inputs(items)
if not combined:
# Every source had empty transcript text and no --llm output, so there is
# nothing to aggregate — skip the (billable) Gateway call rather than prompt
# it over an empty transcript and print a meaningless answer to stdout.
output.emit_warning(
"Nothing to reduce: no transcript text across sources.", json_mode=json_mode
)
return
result = llm.run_chain(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The llm.run_chain call receives transcript_text assembled from user transcripts; do not send unsanitized user transcripts to external services without sanitization or explicit opt-in.

Details

✨ AI Reasoning
​The new _run_reduce calls llm.run_chain(api_key, transform.reduce_prompts, transcript_text=combined, ...). The combined string is constructed from user transcripts (see _gather_reduce_inputs) and is passed unchanged as transcript_text to an external LLM-Gateway. This is effectively logging/exfiltrating user-controlled content to a third party and may leak PII or otherwise sensitive data if not sanitized or consented to.

🔧 How do I fix it?
Keep sensitive data such as emails, passwords, and tokens out of logs. When logging values tied to a user, prefer a safe identifier like a user ID over the raw input, and strip line breaks from any user-provided text you do log.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info

api_key,
transform.reduce_prompts,
transcript_text=combined,
model=transform.model,
max_tokens=transform.max_tokens,
)
if json_mode:
# Additive NDJSON event after the per-source {"type":"result"} records.
output.emit_ndjson(
{
"type": "reduce",
"model": transform.model,
"prompts": transform.reduce_prompts,
"output": result,
}
)
else:
output.emit_text(result)


def run_batch(
api_key: str,
sources: list[str],
Expand All @@ -312,7 +379,8 @@ def run_batch(
code; a re-run resumes from the sidecars and retries only the failures.
"""
items = [_Item(source) for source in sources]
with _progress_table(items, json_mode=json_mode):
reduce_active = bool(transform.reduce_prompts)
with _progress_table(items, json_mode=json_mode, reduce_active=reduce_active):
_drain(
api_key,
items,
Expand All @@ -323,3 +391,5 @@ def run_batch(
json_mode=json_mode,
)
_summarize(items, json_mode=json_mode, quiet=quiet)
if reduce_active:
_run_reduce(api_key, items, transform=transform, json_mode=json_mode)
36 changes: 30 additions & 6 deletions aai_cli/app/transcribe/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,24 @@ def run_transcription(


class TransformOptions(NamedTuple):
"""The ``--llm`` chain options: the prompts plus the gateway model settings."""
"""The ``--llm`` chain options: the prompts plus the gateway model settings.

``reduce_prompts`` is the ``--llm-reduce`` chain — the aggregate step run over
all batch results (or appended to the per-transcript chain for a single source).
"""

prompts: list[str]
model: str
max_tokens: int
reduce_prompts: list[str]

def chain(self) -> list[str]:
"""The full single-source chain: the map prompts followed by the reduce ones.

With one source there is nothing to aggregate, so the reduce prompts simply
extend the ``--llm`` chain over that transcript.
"""
return self.prompts + self.reduce_prompts


def deliver_result(
Expand Down Expand Up @@ -137,12 +150,15 @@ def deliver_result(
)
return

if transform.prompts:
chain = transform.chain()
if chain:
# Chain the prompts: the first runs over the transcript (injected server-side
# via transcript_id); each subsequent prompt runs over the prior response.
# --llm-reduce prompts extend the chain here — a single source has nothing to
# aggregate, so reduce is just more chain steps over this one transcript.
steps = llm.run_chain_steps(
api_key,
transform.prompts,
chain,
transcript_id=transcript.id,
model=transform.model,
max_tokens=transform.max_tokens,
Expand Down Expand Up @@ -213,6 +229,7 @@ class TranscribeOptions:
config_kv: list[str] | None
config_file: Path | None
llm_prompt: list[str] | None
llm_reduce: list[str] | None
model: str
max_tokens: int
output_field: choices.TranscriptOutput | None
Expand Down Expand Up @@ -272,7 +289,10 @@ def flags(self, pii_policies: list[str] | None) -> dict[str, object]:
def transform_options(self) -> TransformOptions:
"""The post-transcription LLM transform spec built from the `--llm` flags."""
return TransformOptions(
prompts=list(self.llm_prompt or []), model=self.model, max_tokens=self.max_tokens
prompts=list(self.llm_prompt or []),
model=self.model,
max_tokens=self.max_tokens,
reduce_prompts=list(self.llm_reduce or []),
)


Expand All @@ -292,7 +312,9 @@ def _print_show_code(opts: TranscribeOptions, merged: dict[str, object]) -> None
if opts.source or opts.sample
else "your-audio-file.mp3"
)
gateway = code_gen.gateway_options(list(opts.llm_prompt or []), opts.model, opts.max_tokens)
gateway = code_gen.gateway_options(
list(opts.llm_prompt or []) + list(opts.llm_reduce or []), opts.model, opts.max_tokens
)
output.print_code(
render_transcribe_code(
merged,
Expand All @@ -317,7 +339,9 @@ def run_transcribe(opts: TranscribeOptions, state: AppState, *, json_mode: bool)
transcribe_validate.validate_pii_policies(pii_policies)
flags = opts.flags(pii_policies)

transcribe_validate.validate_out_with_llm(opts.out, opts.llm_prompt)
transcribe_validate.validate_out_with_llm(
opts.out, (opts.llm_prompt or []) + (opts.llm_reduce or []) or None
)
transcribe_validate.validate_out_path(opts.out)
transcribe_validate.validate_json_with_output(opts.output_field, json_mode=json_mode)
client.validate_chars_per_caption(opts.chars_per_caption, opts.output_field)
Expand Down
9 changes: 9 additions & 0 deletions aai_cli/commands/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ def transcribe(
"prompt runs on the previous one's response (a chain), the first on the transcript.",
rich_help_panel=help_panels.OPT_LLM,
),
llm_reduce: list[str] | None = typer.Option(
None,
"--llm-reduce",
help="Run one LLM-Gateway prompt over all batch results (a reduce). "
"Repeatable: each runs on the previous one's output. For a single source it "
"extends the --llm chain over that transcript.",
rich_help_panel=help_panels.OPT_LLM,
),
model: str = typer.Option(
llm.DEFAULT_MODEL,
"--model",
Expand Down Expand Up @@ -410,6 +418,7 @@ def transcribe(
config_kv=config_kv,
config_file=config_file,
llm_prompt=llm_prompt,
llm_reduce=llm_reduce,
model=model,
max_tokens=max_tokens,
output_field=output_field,
Expand Down
Loading
Loading