diff --git a/pkg/telemetry/streamer.go b/pkg/telemetry/streamer.go index 49d77758..32bd0e91 100644 --- a/pkg/telemetry/streamer.go +++ b/pkg/telemetry/streamer.go @@ -218,7 +218,7 @@ func (c *streamingClient) handleLines(ctx context.Context, reader *bufio.Reader) go func() { defer close(lines) for { - line, err := reader.ReadString('\n') + line, err := readACMILine(reader) select { case lines <- result{line, err}: case <-ctx.Done(): @@ -252,29 +252,37 @@ func (c *streamingClient) handleLines(ctx context.Context, reader *bufio.Reader) } return fmt.Errorf("error reading line: %w", r.err) } - if err := c.handleLine(r.line, reader); err != nil { + if err := c.handleLine(r.line); err != nil { return fmt.Errorf("error reading ACMI stream: %w", err) } } } } -func (c *streamingClient) handleLine(line string, reader *bufio.Reader) error { - if strings.HasSuffix(line, "\\\n") { - var sb strings.Builder - sb.WriteString(line[:len(line)-2]) - for { - next, err := reader.ReadString('\n') - if err != nil { - return fmt.Errorf("error reading continuation line: %w", err) - } - sb.WriteString(next) - if !strings.HasSuffix(next, "\\\n") { - break - } +func readACMILine(reader *bufio.Reader) (string, error) { + line, err := reader.ReadString('\n') + if err != nil { + return line, err + } + if !strings.HasSuffix(line, "\\\n") { + return line, nil + } + + var sb strings.Builder + sb.WriteString(line[:len(line)-2]) + for { + next, readErr := reader.ReadString('\n') + if readErr != nil { + return "", fmt.Errorf("error reading continuation line: %w", readErr) + } + sb.WriteString(next) + if !strings.HasSuffix(next, "\\\n") { + return sb.String(), nil } - line = sb.String() } +} + +func (c *streamingClient) handleLine(line string) error { line = strings.TrimSpace(line) if line == "" { return nil diff --git a/pkg/telemetry/streamer_test.go b/pkg/telemetry/streamer_test.go new file mode 100644 index 00000000..736bfaf4 --- /dev/null +++ b/pkg/telemetry/streamer_test.go @@ -0,0 +1,56 @@ +package telemetry + +import ( + "bufio" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReadACMILineContinuationPreservesNextRecord(t *testing.T) { + t.Parallel() + + reader := bufio.NewReader(strings.NewReader(strings.Join([]string{ + "0,Comments=DCS Retribution Turn 6\\", + "====================\\", + "Most briefing information can be found on your kneeboard.", + "0,ReferenceTime=1989-09-13T10:08:31Z", + }, "\n") + "\n")) + + first, err := readACMILine(reader) + require.NoError(t, err) + assert.Equal( + t, + "0,Comments=DCS Retribution Turn 6====================\\\nMost briefing information can be found on your kneeboard.\n", + first, + ) + + second, err := readACMILine(reader) + require.NoError(t, err) + assert.Equal(t, "0,ReferenceTime=1989-09-13T10:08:31Z\n", second) +} + +func TestHandleLineAcceptsContinuationRecordBeforeNextObject(t *testing.T) { + t.Parallel() + + client := newStreamingClient(time.Second) + reader := bufio.NewReader(strings.NewReader(strings.Join([]string{ + "0,Comments=DCS Retribution Turn 6\\", + "====================\\", + "Most briefing information can be found on your kneeboard.", + "0,ReferenceTime=1989-09-13T10:08:31Z", + }, "\n") + "\n")) + + line, err := readACMILine(reader) + require.NoError(t, err) + require.NoError(t, client.handleLine(line)) + + line, err = readACMILine(reader) + require.NoError(t, err) + require.NoError(t, client.handleLine(line)) + + assert.Equal(t, time.Date(1989, 9, 13, 10, 8, 31, 0, time.UTC), client.Time()) +}