diff --git a/pkg/ai-stream/bridgev2/events.go b/pkg/ai-stream/bridgev2/events.go index 7781c5ea..1fe09a3a 100644 --- a/pkg/ai-stream/bridgev2/events.go +++ b/pkg/ai-stream/bridgev2/events.go @@ -69,6 +69,22 @@ func ApprovalPrompt(portalKey networkid.PortalKey, sender networkid.UserID, ctx } } +func FinalEditExtra(extra map[string]any) map[string]any { + out := make(map[string]any, len(extra)+1) + for key, value := range extra { + out[key] = value + } + out["com.beeper.stream"] = nil + return out +} + +func FinalEditTopLevelExtra() map[string]any { + return map[string]any{ + "com.beeper.dont_render_edited": true, + "com.beeper.stream": nil, + } +} + func FinalMetadataEdit(portalKey networkid.PortalKey, sender networkid.UserID, messageID networkid.MessageID, run aistream.Run, timestamp time.Time) *simplevent.Message[*aistream.Run] { finalContent, finalExtra := aimatrix.FinalContent(run) return FinalMetadataEditWithContent(portalKey, sender, messageID, run, finalContent, finalExtra, timestamp) @@ -86,13 +102,11 @@ func FinalMetadataEditWithContent(portalKey networkid.PortalKey, sender networki } return &bridgev2.ConvertedEdit{ ModifiedParts: []*bridgev2.ConvertedEditPart{{ - Part: existing[0], - Type: event.EventMessage, - Content: finalContent, - Extra: finalExtra, - TopLevelExtra: map[string]any{ - "com.beeper.dont_render_edited": true, - }, + Part: existing[0], + Type: event.EventMessage, + Content: finalContent, + Extra: FinalEditExtra(finalExtra), + TopLevelExtra: FinalEditTopLevelExtra(), }}, }, nil }, diff --git a/pkg/ai-stream/bridgev2/events_test.go b/pkg/ai-stream/bridgev2/events_test.go index aaf9f77a..ae461e5c 100644 --- a/pkg/ai-stream/bridgev2/events_test.go +++ b/pkg/ai-stream/bridgev2/events_test.go @@ -104,9 +104,19 @@ func TestFinalMetadataEditUsesCompactAnchorContent(t *testing.T) { if err != nil { t.Fatal(err) } - ai, ok := converted.ModifiedParts[0].Extra[aistream.BeeperAIKey].(aistream.BeeperAI) + part := converted.ModifiedParts[0] + ai, ok := part.Extra[aistream.BeeperAIKey].(aistream.BeeperAI) if !ok || ai.Kind != aistream.AIKindFinal { - t.Fatalf("final metadata edit missing final AI payload: %#v", converted.ModifiedParts[0].Extra) + t.Fatalf("final metadata edit missing final AI payload: %#v", part.Extra) + } + if stream, ok := part.Extra["com.beeper.stream"]; !ok || stream != nil { + t.Fatalf("final metadata edit must clear stream in m.new_content: %#v", part.Extra) + } + if stream, ok := part.TopLevelExtra["com.beeper.stream"]; !ok || stream != nil { + t.Fatalf("final metadata edit must clear stream at top level: %#v", part.TopLevelExtra) + } + if part.TopLevelExtra["com.beeper.dont_render_edited"] != true { + t.Fatalf("final metadata edit missing no-render-edited marker: %#v", part.TopLevelExtra) } } diff --git a/pkg/ai-stream/run.go b/pkg/ai-stream/run.go index 991f5032..335dc803 100644 --- a/pkg/ai-stream/run.go +++ b/pkg/ai-stream/run.go @@ -182,6 +182,7 @@ type ArtifactSummary struct { type Writer struct { Run *Run builder agui.EventBuilder + now func() time.Time textMessages map[int]string textOpen map[int]bool textContentWritten bool @@ -237,6 +238,7 @@ func NewWriter(run *Run, now func() time.Time) *Writer { writer := &Writer{ Run: run, builder: agui.NewEventBuilder(run.Model, now), + now: now, textMessages: map[int]string{}, textOpen: map[int]bool{}, textIndexOffset: textIndexOffset, @@ -252,6 +254,15 @@ func NewWriter(run *Run, now func() time.Time) *Writer { return writer } +func (w *Writer) SetModel(model string) { + model = strings.TrimSpace(model) + if w == nil || w.Run == nil || model == "" { + return + } + w.Run.Model = model + w.builder = agui.NewEventBuilder(model, w.now) +} + func (w *Writer) Add(evt agui.Event) { if w == nil || w.Run == nil || evt.Len() == 0 { return diff --git a/pkg/ai-stream/stream_test.go b/pkg/ai-stream/stream_test.go index 9a26db12..09fd06fb 100644 --- a/pkg/ai-stream/stream_test.go +++ b/pkg/ai-stream/stream_test.go @@ -30,6 +30,21 @@ func TestPackRunDoesNotSplitOrTruncateBySize(t *testing.T) { } } +func TestWriterSetModelUpdatesRunAndFutureEventEnvelopes(t *testing.T) { + run := NewRun("run-1", "thread-1", "old/model", "ai", "AI", time.Unix(10, 0)) + writer := NewWriter(run, func() time.Time { return time.Unix(10, 0) }) + writer.Start() + writer.SetModel("new/model") + writer.Text("hello") + + if run.Model != "new/model" { + t.Fatalf("run model = %q, want new/model", run.Model) + } + if got := run.Events[len(run.Events)-1].Get("model"); got != "new/model" { + t.Fatalf("future event model = %#v, want new/model", got) + } +} + func TestPackRunDoesNotPutFinalizationTotalsOnStreamEnvelopes(t *testing.T) { run := NewRun("run-1", "thread-1", DefaultModel, "ai", "AI", time.Unix(10, 0)) writer := NewWriter(run, func() time.Time { return time.Unix(10, 0) }) diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 8dd82adc..0d8f3cfb 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -988,13 +988,11 @@ func (cl *Client) assistantFinalEditWithProjection(portalKey networkid.PortalKey } return &bridgev2.ConvertedEdit{ ModifiedParts: []*bridgev2.ConvertedEditPart{{ - Part: existing[0], - Type: event.EventMessage, - Content: projection.Content, - Extra: projection.Extra, - TopLevelExtra: map[string]any{ - "com.beeper.dont_render_edited": true, - }, + Part: existing[0], + Type: event.EventMessage, + Content: projection.Content, + Extra: aibridgev2.FinalEditExtra(projection.Extra), + TopLevelExtra: aibridgev2.FinalEditTopLevelExtra(), }}, }, nil } diff --git a/pkg/connector/stream_test.go b/pkg/connector/stream_test.go index 305c9d93..88952098 100644 --- a/pkg/connector/stream_test.go +++ b/pkg/connector/stream_test.go @@ -1008,6 +1008,13 @@ func TestAssistantEventMetadataCanBeFinalizedBeforeInsert(t *testing.T) { if profile == nil || profile.ID != string(aiid.ModelContactID("beeper", "gpt-5")) || profile.Displayname != "gpt-5" || !profile.HasFallback { t.Fatalf("assistant final edit missing model profile: %#v", profile) } + part := converted.ModifiedParts[0] + if stream, ok := part.Extra["com.beeper.stream"]; !ok || stream != nil { + t.Fatalf("assistant final edit must clear stream in m.new_content: %#v", part.Extra) + } + if stream, ok := part.TopLevelExtra["com.beeper.stream"]; !ok || stream != nil { + t.Fatalf("assistant final edit must clear stream at top level: %#v", part.TopLevelExtra) + } } func TestAssistantFallbackEventsUseStrictMatrixOrder(t *testing.T) {