From 90236da44a6aa9ab119598fdb499656060236bee Mon Sep 17 00:00:00 2001 From: red-one1 Date: Mon, 18 May 2026 21:03:36 +1000 Subject: [PATCH 1/2] fix(telemetry): read ACMI continuation records in a single goroutine The telemetry stream reader was consuming the same bufio.Reader from two goroutines: the background line reader and handleLine() when it followed continuation lines ending with "\". That breaks parsing for multi-line global properties such as Comments in Tacview ACMI streams. Continuation content and the next object record can be interleaved, producing parse failures like: strconv.ParseUint: parsing "====================0": invalid syntax Refactor the reader so the background goroutine owns all reads and assembles a full logical ACMI record before parsing. Add regression tests covering a multi-line Comments record followed by the next global update. --- pkg/telemetry/streamer.go | 40 ++++++++++++++---------- pkg/telemetry/streamer_test.go | 56 ++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 16 deletions(-) create mode 100644 pkg/telemetry/streamer_test.go diff --git a/pkg/telemetry/streamer.go b/pkg/telemetry/streamer.go index 49d77758..32bd0e91 100644 --- a/pkg/telemetry/streamer.go +++ b/pkg/telemetry/streamer.go @@ -218,7 +218,7 @@ func (c *streamingClient) handleLines(ctx context.Context, reader *bufio.Reader) go func() { defer close(lines) for { - line, err := reader.ReadString('\n') + line, err := readACMILine(reader) select { case lines <- result{line, err}: case <-ctx.Done(): @@ -252,29 +252,37 @@ func (c *streamingClient) handleLines(ctx context.Context, reader *bufio.Reader) } return fmt.Errorf("error reading line: %w", r.err) } - if err := c.handleLine(r.line, reader); err != nil { + if err := c.handleLine(r.line); err != nil { return fmt.Errorf("error reading ACMI stream: %w", err) } } } } -func (c *streamingClient) handleLine(line string, reader *bufio.Reader) error { - if strings.HasSuffix(line, "\\\n") { - var sb strings.Builder - sb.WriteString(line[:len(line)-2]) - for { - next, err := reader.ReadString('\n') - if err != nil { - return fmt.Errorf("error reading continuation line: %w", err) - } - sb.WriteString(next) - if !strings.HasSuffix(next, "\\\n") { - break - } +func readACMILine(reader *bufio.Reader) (string, error) { + line, err := reader.ReadString('\n') + if err != nil { + return line, err + } + if !strings.HasSuffix(line, "\\\n") { + return line, nil + } + + var sb strings.Builder + sb.WriteString(line[:len(line)-2]) + for { + next, readErr := reader.ReadString('\n') + if readErr != nil { + return "", fmt.Errorf("error reading continuation line: %w", readErr) + } + sb.WriteString(next) + if !strings.HasSuffix(next, "\\\n") { + return sb.String(), nil } - line = sb.String() } +} + +func (c *streamingClient) handleLine(line string) error { line = strings.TrimSpace(line) if line == "" { return nil diff --git a/pkg/telemetry/streamer_test.go b/pkg/telemetry/streamer_test.go new file mode 100644 index 00000000..5be9cbb2 --- /dev/null +++ b/pkg/telemetry/streamer_test.go @@ -0,0 +1,56 @@ +package telemetry + +import ( + "bufio" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReadACMILineContinuationPreservesNextRecord(t *testing.T) { + t.Parallel() + + reader := bufio.NewReader(strings.NewReader(strings.Join([]string{ + "0,Comments=DCS Retribution Turn 6\\", + "====================\\", + "Most briefing information can be found on your kneeboard.", + "0,ReferenceTime=1989-09-13T10:08:31Z", + }, "\n") + "\n")) + + first, err := readACMILine(reader) + require.NoError(t, err) + assert.Equal( + t, + "0,Comments=DCS Retribution Turn 6====================\\\nMost briefing information can be found on your kneeboard.\n", + first, + ) + + second, err := readACMILine(reader) + require.NoError(t, err) + assert.Equal(t, "0,ReferenceTime=1989-09-13T10:08:31Z\n", second) +} + +func TestHandleLineAcceptsContinuationRecordBeforeNextObject(t *testing.T) { + t.Parallel() + + client := newStreamingClient(time.Second) + reader := bufio.NewReader(strings.NewReader(strings.Join([]string{ + "0,Comments=DCS Retribution Turn 6\\", + "====================\\", + "Most briefing information can be found on your kneeboard.", + "0,ReferenceTime=1989-09-13T10:08:31Z", + }, "\n") + "\n")) + + line, err := readACMILine(reader) + require.NoError(t, err) + require.NoError(t, client.handleLine(line)) + + line, err = readACMILine(reader) + require.NoError(t, err) + require.NoError(t, client.handleLine(line)) + + assert.Equal(t, time.Date(1989, 9, 13, 10, 8, 31, 0, time.UTC), client.Time()) +} \ No newline at end of file From 73e15f9283709c632c5db2d11d2350b374a09b05 Mon Sep 17 00:00:00 2001 From: Dharma Bellamkonda Date: Mon, 18 May 2026 21:50:29 -0600 Subject: [PATCH 2/2] fix(telemetry): add missing newline at EOF in streamer_test.go Co-Authored-By: Claude Sonnet 4.6 --- pkg/telemetry/streamer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/telemetry/streamer_test.go b/pkg/telemetry/streamer_test.go index 5be9cbb2..736bfaf4 100644 --- a/pkg/telemetry/streamer_test.go +++ b/pkg/telemetry/streamer_test.go @@ -53,4 +53,4 @@ func TestHandleLineAcceptsContinuationRecordBeforeNextObject(t *testing.T) { require.NoError(t, client.handleLine(line)) assert.Equal(t, time.Date(1989, 9, 13, 10, 8, 31, 0, time.UTC), client.Time()) -} \ No newline at end of file +}