Skip to content

chore: loki and opensearch improvements#1749

Open
moshloop wants to merge 8 commits intomainfrom
fix/loki-opensearch
Open

chore: loki and opensearch improvements#1749
moshloop wants to merge 8 commits intomainfrom
fix/loki-opensearch

Conversation

@moshloop
Copy link
Copy Markdown
Member

@moshloop moshloop commented Jan 28, 2026

Summary

  • Improve OpenSearch APIs and add search tests
  • Refactor Loki tests to use deps package
  • Fix OpenSearch client
  • E2E test fixes

Changes

  • logs/opensearch/ - Enhanced search functionality and tests
  • logs/loki/ - Moved loki tests to use deps, added loki server config
  • Updated CI workflow and test configuration

Summary by CodeRabbit

  • Dependencies

    • Broad refresh of many Go modules and indirect replacements across cloud, DB, observability, and tooling stacks.
  • New Features

    • Paginated (scroll) search for logs.
    • Embeddable/local Loki test server and configuration for in-process log testing.
  • Tests

    • Simplified CI e2e/test workflows and updated tests for OpenSearch JSON handling; removed external Loki service dependency.
  • Bug Fixes

    • Improved cloud credentials parsing and HTTP error status handling.
  • Chores

    • Updated ignored files list.

✏️ Tip: You can customize this high-level summary in your review settings.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Jan 28, 2026

Benchstat

Base: 15407cc3171eff30ef7c3688119072198068c439
Head: 34d28273ad9db62f9acf366d27da5fb5ebd3e2eb

goos: linux
goarch: amd64
pkg: github.com/flanksource/duty/bench
cpu: AMD EPYC 7763 64-Core Processor                
                                                │ bench-base.txt │          bench-head.txt           │
                                                │     sec/op     │   sec/op     vs base              │
Main/Sample-15000/catalog_changes/Without_RLS-4     5.497m ±  7%   5.323m ± 2%  -3.16% (p=0.015 n=6)
Main/Sample-15000/catalog_changes/With_RLS-4        129.2m ±  1%   128.1m ± 1%  -0.80% (p=0.026 n=6)
Main/Sample-15000/config_changes/Without_RLS-4      5.182m ±  4%   5.254m ± 2%       ~ (p=0.485 n=6)
Main/Sample-15000/config_changes/With_RLS-4         129.1m ±  1%   127.7m ± 1%  -1.07% (p=0.002 n=6)
Main/Sample-15000/config_detail/Without_RLS-4       3.994m ± 10%   3.835m ± 4%  -3.97% (p=0.009 n=6)
Main/Sample-15000/config_detail/With_RLS-4          126.6m ±  1%   123.8m ± 1%  -2.27% (p=0.002 n=6)
Main/Sample-15000/config_names/Without_RLS-4        13.29m ±  3%   12.59m ± 4%  -5.28% (p=0.009 n=6)
Main/Sample-15000/config_names/With_RLS-4           127.2m ±  2%   127.2m ± 0%       ~ (p=0.937 n=6)
Main/Sample-15000/config_summary/Without_RLS-4      66.15m ±  4%   63.43m ± 1%  -4.11% (p=0.015 n=6)
Main/Sample-15000/config_summary/With_RLS-4         752.8m ±  1%   745.6m ± 0%  -0.96% (p=0.015 n=6)
Main/Sample-15000/configs/Without_RLS-4             7.539m ±  9%   7.433m ± 3%       ~ (p=0.310 n=6)
Main/Sample-15000/configs/With_RLS-4                127.1m ±  1%   122.6m ± 1%  -3.52% (p=0.002 n=6)
Main/Sample-15000/analysis_types/Without_RLS-4      4.033m ±  5%   3.821m ± 3%  -5.24% (p=0.009 n=6)
Main/Sample-15000/analysis_types/With_RLS-4         3.961m ±  1%   3.855m ± 3%  -2.67% (p=0.041 n=6)
Main/Sample-15000/analyzer_types/Without_RLS-4      3.733m ±  3%   3.699m ± 2%       ~ (p=0.180 n=6)
Main/Sample-15000/analyzer_types/With_RLS-4         4.050m ±  2%   3.737m ± 3%  -7.73% (p=0.002 n=6)
Main/Sample-15000/change_types/Without_RLS-4        5.404m ± 15%   5.309m ± 5%       ~ (p=0.937 n=6)
Main/Sample-15000/change_types/With_RLS-4           5.391m ±  3%   5.349m ± 3%       ~ (p=0.394 n=6)
Main/Sample-15000/config_classes/Without_RLS-4      3.392m ±  2%   3.304m ± 2%  -2.61% (p=0.009 n=6)
Main/Sample-15000/config_classes/With_RLS-4         124.6m ±  1%   123.6m ± 1%  -0.75% (p=0.026 n=6)
Main/Sample-15000/config_types/Without_RLS-4        3.860m ±  1%   3.949m ± 4%       ~ (p=0.065 n=6)
Main/Sample-15000/config_types/With_RLS-4           123.8m ±  2%   123.6m ± 2%       ~ (p=0.394 n=6)
geomean                                             19.62m         19.21m       -2.10%

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Jan 28, 2026

Warning

Rate limit exceeded

@moshloop has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 21 minutes and 14 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

Walkthrough

Consolidates CI/test orchestration, replaces external Loki with an in-process Loki test server, adds OpenSearch scroll APIs and @json field preprocessing, refreshes many Go module versions, updates GCP credential handling to use parsed credentials, and tweaks minor API error handling.

Changes

Cohort / File(s) Summary
CI/CD & Test Orchestration
/.github/workflows/test.yaml, Makefile, /.gitignore
Reduced Postgres matrix to 15/16, unified env quoting, removed Loki service from CI, added deps-fetch step, switched E2E invocation to make test-e2e, removed ginkgo target, and changed .gitignore to ignore ginkgo-report*.
Loki In-Process Testing
logs/loki/loki-config.yaml, logs/loki/server.go, logs/loki/loki_test.go
Added Loki config, new Server helper (Start/Stop, config generation, binary install, readiness), LogEntry/LogStream types and UploadLogs, and refactored tests to use embedded server instead of external service.
OpenSearch Search Enhancements
logs/opensearch/search.go, logs/opensearch/types.go, logs/opensearch/search_test.go
Renamed searcherSearcher, exposed raw client, reworked client init, added scroll APIs (SearchWithScroll, ScrollNext, ClearScroll) and types, and implemented @json field preprocessing with tests.
E2E Test Suite Cleanup
tests/e2e/docker-compose.yaml, tests/e2e/suite_test.go
Removed Loki service from docker-compose and deleted e2e suite bootstrap (suite_test.go) including DefaultContext and Before/After hooks.
Go Dependency Refresh
go.mod
Large multi-package dependency updates and replacement mappings across cloud SDKs, AWS, OpenTelemetry, k8s, DB drivers, and tooling; many version bumps and replacements.
GCP Credential Handling
connection/gcs.go, connection/gke.go
Switched from option.WithCredentialsJSON to parsing JSON via google.CredentialsFromJSON and using option.WithCredentials, with added error handling.
API Error Handling
api/http.go
For oops.OopsError, now asserts Code() to string before passing to ErrorStatusCode (defaults to 500 on failure).
Tests & Ignored Files
.gitignore, tests/...logs/loki
.gitignore now ignores ginkgo-report*; tests updated to use embedded Loki and e2e bootstrap removed.

Sequence Diagram(s)

sequenceDiagram
    participant Test as Test Code
    participant Server as Loki Server Helper
    participant Process as OS Process
    participant Ready as /ready endpoint

    Test->>Server: NewServer(config)
    Test->>Server: Start()
    Server->>Server: generateConfig() & write files
    Server->>Process: exec loki binary
    Process-->>Server: process handle
    Server->>Ready: poll /ready
    Ready-->>Server: HTTP 200
    Server-->>Test: Ready
    Test->>Server: UploadLogs(streams)
    Server->>Server: format & POST /loki/api/v1/push
    Test->>Server: Stop()
    Server->>Process: terminate process
    Process-->>Server: exited
Loading
sequenceDiagram
    participant Client as Caller
    participant Searcher as Searcher
    participant OpenSearch as OpenSearch API
    participant Preproc as JSON Preprocessor

    Client->>Searcher: SearchWithScroll(ctx, req)
    Searcher->>OpenSearch: POST /_search (with scroll)
    OpenSearch-->>Searcher: hits + scroll_id
    Searcher->>Preproc: preprocessJSONFields(hits)
    Preproc-->>Searcher: processed hits
    Searcher-->>Client: LogResult, scroll_id

    Client->>Searcher: ScrollNext(ctx, scroll_id)
    Searcher->>OpenSearch: POST /_search/scroll
    OpenSearch-->>Searcher: next hits
    Searcher->>Preproc: preprocessJSONFields(hits)
    Preproc-->>Searcher: processed hits
    Searcher-->>Client: LogResult, new_scroll_id

    Client->>Searcher: ClearScroll(ctx, scroll_id)
    Searcher->>OpenSearch: DELETE /_search/scroll
    OpenSearch-->>Searcher: acknowledged
    Searcher-->>Client: cleared
Loading
🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'chore: loki and opensearch improvements' accurately reflects the main changes across multiple files including significant enhancements to Loki and OpenSearch APIs, test refactoring, and CI/workflow updates.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/loki-opensearch

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
.github/workflows/test.yaml (1)

124-124: Dead code: condition references removed PostgreSQL version.

The condition matrix.postgres-version.tag == '14' will never evaluate to true since PostgreSQL 14 was removed from the matrix (only versions 15 and 16 remain). This results in DUTY_DB_DISABLE_RLS always being set to 'false'.

If version 14 support is intentionally removed, consider simplifying:

🧹 Proposed cleanup
       - name: Apply base migrations
         run: ${{ env.migrate_command }}
         env:
-          DUTY_DB_DISABLE_RLS: ${{ matrix.postgres-version.tag == '14' && 'true' || 'false' }}
+          DUTY_DB_DISABLE_RLS: "false"
...
       - name: Apply new migrations
         run: ${{ env.migrate_command }}
         env:
-          DUTY_DB_DISABLE_RLS: ${{ matrix.postgres-version.tag == '14' && 'true' || 'false' }}
+          DUTY_DB_DISABLE_RLS: "false"

Also applies to: 132-132

logs/opensearch/search.go (1)

64-71: Close the response body after Ping() to avoid leaking connections.

The OpenSearch client Ping() response body must be closed to release resources, consistent with how other API responses are handled in this file (Search, Scroll, ClearScroll all use defer res.Body.Close()).

🔧 Proposed fix
 	pingResp, err := client.Ping()
 	if err != nil {
 		return nil, ctx.Oops().Wrapf(err, "error pinging the openSearch client")
 	}
+	defer pingResp.Body.Close()
 
 	if pingResp.StatusCode != 200 {
 		return nil, ctx.Oops().Errorf("[opensearch] got ping response: %d", pingResp.StatusCode)
 	}
🤖 Fix all issues with AI agents
In `@logs/loki/loki_test.go`:
- Around line 32-46: The setup ignores errors from os.MkdirTemp and never cleans
the tempDir on teardown; update the ginkgo.BeforeAll block to check and handle
the error returned by os.MkdirTemp (fail the test on error) and ensure tempDir
is recorded; in ginkgo.AfterAll, after stopping lokiServer (if not nil) call
os.RemoveAll(tempDir) (guarding for non-empty tempDir) to delete the temp
directory and avoid test residue, and propagate or fail the test if RemoveAll
returns an error.

In `@logs/loki/server.go`:
- Line 63: The call to os.MkdirAll(s.dataPath, 0755) currently discards its
error; update the code to check the returned error from os.MkdirAll and handle
it (e.g., return or log a descriptive error including s.dataPath and the
original error) before proceeding to any file writes; locate the call to
os.MkdirAll in the server code where s.dataPath is used and replace the ignored
result with proper error handling that surfaces permission or path issues.
- Around line 113-144: UploadLogs currently mutates the caller's data by
modifying streams[i].Labels; to fix, avoid in-place mutation by creating a local
copy of each LogStream and its Labels map before adding extraLabels (e.g., for
each stream in UploadLogs, construct a new LogStream value or shallow-copy the
struct and make a new map for Labels, then copy existing labels and merge
extraLabels into that copy), then call toLokiFormat on the copied stream;
reference the UploadLogs function, LogStream type, Labels field, and
toLokiFormat method when implementing the non-mutating copy/merge.

In `@logs/opensearch/search_test.go`:
- Around line 1-219: Replace standard testing assertions in the tests with
Gomega: create a g := gomega.NewWithT(t) at the start of each test (e.g.,
TestPreprocessJSONFields, TestPreprocessJSONFieldsModifiesInPlace,
TestParseSearchResponseWithJSONFields) and use g.Expect(...) constructs instead
of t.Errorf/t.Fatalf and reflect.DeepEqual; for equality use
g.Expect(value).To(gomega.Equal(expected)), for existence use
g.Expect(ok).To(gomega.BeTrue()) or
g.Expect(found).To(gomega.HaveKeyWithValue(...)), and for nil/non-nil checks use
g.Expect(...).ToNot(gomega.BeNil()) as appropriate when verifying
preprocessJSONFields mutation and searcher.parseSearchResponse outputs. Ensure
you import gomega in the test file.

In `@logs/opensearch/types.go`:
- Around line 27-35: The Timeout field on ScrollOptions uses time.Duration which
JSON-serializes as an integer; change its type to the project's types.Duration
wrapper so it serializes as a human-readable string. Update the struct
declaration for ScrollOptions to use types.Duration for the Timeout field and
add the appropriate import for the types package; then audit any code that
constructs or reads ScrollOptions.Timeout and convert between types.Duration and
time.Duration where needed (e.g., use types.Duration(30*time.Second) when
setting or access .Duration when a raw time.Duration is required).
🧹 Nitpick comments (2)
logs/loki/server.go (2)

78-86: Stop() masks errors by always returning nil.

The method logs a warning if stopping fails but always returns nil. Callers cannot determine if the process was stopped successfully. Consider returning the error or documenting this intentional behavior.

♻️ Proposed fix to propagate the error
 func (s *Server) Stop() error {
 	if s.process != nil {
 		if err := s.process.Stop(); err != nil {
-			logger.Warnf("failed to stop loki process: %v", err)
+			return fmt.Errorf("failed to stop loki process: %w", err)
 		}
 	}
-
 	return nil
 }

142-143: Hardcoded sleep for eventual consistency.

The fixed 2-second sleep is a workaround for Loki's eventual consistency. Consider making this configurable or documenting why this specific duration was chosen. For tests, this adds unnecessary latency when logs may be available sooner.

Comment thread logs/loki/loki_test.go
Comment on lines 32 to +46
ginkgo.BeforeAll(func() {
lokiURL = os.Getenv("LOKI_URL")
if lokiURL == "" {
lokiURL = "http://localhost:3100"
}
ctx = DefaultContext

Eventually(func() error {
resp, err := http.Get(lokiURL + "/ready")
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("loki not ready, status: %d", resp.StatusCode)
}

return nil
}, 30*time.Second, 2*time.Second).Should(Succeed())
ctx = dutyCtx.NewContext(context.Background())
tempDir, _ = os.MkdirTemp("", "loki")
lokiServer = loki.NewServer(loki.ServerConfig{DataPath: tempDir})
err := lokiServer.Start()
Expect(err).NotTo(HaveOccurred())

err := injectLokiLogs(lokiURL, map[string]string{"source": "setup"})
err = lokiServer.UploadLogs(testLogStreams(), map[string]string{"source": "setup"})
Expect(err).NotTo(HaveOccurred())
})

ginkgo.AfterAll(func() {
if lokiServer != nil {
_ = lokiServer.Stop()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle temp dir creation errors (and clean up).

Ignoring os.MkdirTemp errors can mask setup failures; also remove the temp dir in teardown to avoid test residue.

🔧 Proposed fix
 ginkgo.BeforeAll(func() {
 	ctx = dutyCtx.NewContext(context.Background())
-	tempDir, _ = os.MkdirTemp("", "loki")
+	tempDir, err := os.MkdirTemp("", "loki")
+	Expect(err).NotTo(HaveOccurred())
 	lokiServer = loki.NewServer(loki.ServerConfig{DataPath: tempDir})
-	err := lokiServer.Start()
+	err = lokiServer.Start()
 	Expect(err).NotTo(HaveOccurred())

 	err = lokiServer.UploadLogs(testLogStreams(), map[string]string{"source": "setup"})
 	Expect(err).NotTo(HaveOccurred())
 })
 
 ginkgo.AfterAll(func() {
 	if lokiServer != nil {
 		_ = lokiServer.Stop()
 	}
+	if tempDir != "" {
+		_ = os.RemoveAll(tempDir)
+	}
 })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ginkgo.BeforeAll(func() {
lokiURL = os.Getenv("LOKI_URL")
if lokiURL == "" {
lokiURL = "http://localhost:3100"
}
ctx = DefaultContext
Eventually(func() error {
resp, err := http.Get(lokiURL + "/ready")
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("loki not ready, status: %d", resp.StatusCode)
}
return nil
}, 30*time.Second, 2*time.Second).Should(Succeed())
ctx = dutyCtx.NewContext(context.Background())
tempDir, _ = os.MkdirTemp("", "loki")
lokiServer = loki.NewServer(loki.ServerConfig{DataPath: tempDir})
err := lokiServer.Start()
Expect(err).NotTo(HaveOccurred())
err := injectLokiLogs(lokiURL, map[string]string{"source": "setup"})
err = lokiServer.UploadLogs(testLogStreams(), map[string]string{"source": "setup"})
Expect(err).NotTo(HaveOccurred())
})
ginkgo.AfterAll(func() {
if lokiServer != nil {
_ = lokiServer.Stop()
}
ginkgo.BeforeAll(func() {
ctx = dutyCtx.NewContext(context.Background())
tempDir, err := os.MkdirTemp("", "loki")
Expect(err).NotTo(HaveOccurred())
lokiServer = loki.NewServer(loki.ServerConfig{DataPath: tempDir})
err = lokiServer.Start()
Expect(err).NotTo(HaveOccurred())
err = lokiServer.UploadLogs(testLogStreams(), map[string]string{"source": "setup"})
Expect(err).NotTo(HaveOccurred())
})
ginkgo.AfterAll(func() {
if lokiServer != nil {
_ = lokiServer.Stop()
}
if tempDir != "" {
_ = os.RemoveAll(tempDir)
}
})
🤖 Prompt for AI Agents
In `@logs/loki/loki_test.go` around lines 32 - 46, The setup ignores errors from
os.MkdirTemp and never cleans the tempDir on teardown; update the
ginkgo.BeforeAll block to check and handle the error returned by os.MkdirTemp
(fail the test on error) and ensure tempDir is recorded; in ginkgo.AfterAll,
after stopping lokiServer (if not nil) call os.RemoveAll(tempDir) (guarding for
non-empty tempDir) to delete the temp directory and avoid test residue, and
propagate or fail the test if RemoveAll returns an error.

Comment thread logs/loki/server.go
s.binPath = filepath.Join(res.BinDir, "loki")
s.dataPath = s.config.DataPath

_ = os.MkdirAll(s.dataPath, 0755)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle directory creation error.

The error from os.MkdirAll is silently discarded. While the subsequent WriteFile will fail if the directory doesn't exist, the error message won't indicate the root cause (e.g., permission denied on parent directory).

🐛 Proposed fix
-	_ = os.MkdirAll(s.dataPath, 0755)
+	if err := os.MkdirAll(s.dataPath, 0755); err != nil {
+		return fmt.Errorf("failed to create data directory: %w", err)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
_ = os.MkdirAll(s.dataPath, 0755)
if err := os.MkdirAll(s.dataPath, 0755); err != nil {
return fmt.Errorf("failed to create data directory: %w", err)
}
🤖 Prompt for AI Agents
In `@logs/loki/server.go` at line 63, The call to os.MkdirAll(s.dataPath, 0755)
currently discards its error; update the code to check the returned error from
os.MkdirAll and handle it (e.g., return or log a descriptive error including
s.dataPath and the original error) before proceeding to any file writes; locate
the call to os.MkdirAll in the server code where s.dataPath is used and replace
the ignored result with proper error handling that surfaces permission or path
issues.

Comment thread logs/loki/server.go
Comment on lines +113 to +144
func (s *Server) UploadLogs(streams []LogStream, extraLabels map[string]string) error {
for i := range streams {
if streams[i].Labels == nil {
streams[i].Labels = make(map[string]string)
}
maps.Copy(streams[i].Labels, extraLabels)
}

lokiStreams := make([]map[string]any, len(streams))
for i, stream := range streams {
lokiStreams[i] = stream.toLokiFormat()
}

logData := map[string]any{"streams": lokiStreams}
jsonData, err := json.Marshal(logData)
if err != nil {
return fmt.Errorf("failed to marshal log data: %w", err)
}

resp, err := http.Post(s.URL()+"/loki/api/v1/push", "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to push logs to loki: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("failed to push logs, status code: %d", resp.StatusCode)
}

time.Sleep(2 * time.Second)
return nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

UploadLogs mutates the input slice.

The method modifies streams[i].Labels directly, which mutates the caller's data. This could cause unexpected side effects if the caller reuses the slice.

🐛 Proposed fix to avoid mutating input
 func (s *Server) UploadLogs(streams []LogStream, extraLabels map[string]string) error {
+	lokiStreams := make([]map[string]any, len(streams))
 	for i := range streams {
-		if streams[i].Labels == nil {
-			streams[i].Labels = make(map[string]string)
+		// Create a copy of labels to avoid mutating input
+		labels := make(map[string]string)
+		maps.Copy(labels, streams[i].Labels)
+		maps.Copy(labels, extraLabels)
+		
+		streamCopy := LogStream{
+			Labels:  labels,
+			Entries: streams[i].Entries,
 		}
-		maps.Copy(streams[i].Labels, extraLabels)
-	}
-
-	lokiStreams := make([]map[string]any, len(streams))
-	for i, stream := range streams {
-		lokiStreams[i] = stream.toLokiFormat()
+		lokiStreams[i] = streamCopy.toLokiFormat()
 	}
🤖 Prompt for AI Agents
In `@logs/loki/server.go` around lines 113 - 144, UploadLogs currently mutates the
caller's data by modifying streams[i].Labels; to fix, avoid in-place mutation by
creating a local copy of each LogStream and its Labels map before adding
extraLabels (e.g., for each stream in UploadLogs, construct a new LogStream
value or shallow-copy the struct and make a new map for Labels, then copy
existing labels and merge extraLabels into that copy), then call toLokiFormat on
the copied stream; reference the UploadLogs function, LogStream type, Labels
field, and toLokiFormat method when implementing the non-mutating copy/merge.

Comment on lines +1 to +219
package opensearch

import (
"reflect"
"testing"

dutyContext "github.com/flanksource/duty/context"
"github.com/flanksource/duty/logs"
)

func TestPreprocessJSONFields(t *testing.T) {
tests := []struct {
name string
input map[string]any
expected map[string]any
}{
{
name: "valid JSON in @json field",
input: map[string]any{
"message": "test message",
"config@json": `{"key": "value", "number": 42}`,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config@json": map[string]any{"key": "value", "number": float64(42)},
"other_field": "normal value",
},
},
{
name: "invalid JSON in @json field - should remain unchanged",
input: map[string]any{
"message": "test message",
"config@json": `{"key": "value", invalid json}`,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config@json": `{"key": "value", invalid json}`,
"other_field": "normal value",
},
},
{
name: "non-string value in @json field - should remain unchanged",
input: map[string]any{
"message": "test message",
"config@json": 42,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config@json": 42,
"other_field": "normal value",
},
},
{
name: "nested JSON objects",
input: map[string]any{
"complex@json": `{"nested": {"deep": {"value": "test"}}, "array": [1, 2, 3]}`,
},
expected: map[string]any{
"complex@json": map[string]any{
"nested": map[string]any{
"deep": map[string]any{
"value": "test",
},
},
"array": []any{float64(1), float64(2), float64(3)},
},
},
},
{
name: "empty JSON object",
input: map[string]any{
"empty@json": `{}`,
},
expected: map[string]any{
"empty@json": map[string]any{},
},
},
{
name: "empty JSON array",
input: map[string]any{
"empty@json": `[]`,
},
expected: map[string]any{
"empty@json": []any{},
},
},
{
name: "fields without @json - should remain unchanged",
input: map[string]any{
"message": "test message",
"config": `{"key": "value"}`,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config": `{"key": "value"}`,
"other_field": "normal value",
},
},
{
name: "mixed valid and invalid JSON fields",
input: map[string]any{
"valid@json": `{"valid": true}`,
"invalid@json": `{invalid json}`,
},
expected: map[string]any{
"valid@json": map[string]any{"valid": true},
"invalid@json": `{invalid json}`,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Make a copy of input to avoid modifying the test case
input := make(map[string]any)
for k, v := range tt.input {
input[k] = v
}

preprocessJSONFields(input)

if !reflect.DeepEqual(input, tt.expected) {
t.Errorf("preprocessJSONFields() got = %v, want %v", input, tt.expected)
}
})
}
}

func TestPreprocessJSONFieldsModifiesInPlace(t *testing.T) {
original := map[string]any{
"config@json": `{"modified": true}`,
"unchanged": "value",
}

preprocessJSONFields(original)

// Verify the original map was modified
configValue, ok := original["config@json"].(map[string]any)
if !ok {
t.Errorf("Expected config@json to be unmarshalled to map[string]any, got %T", original["config@json"])
return
}

if configValue["modified"] != true {
t.Errorf("Expected config@json.modified to be true, got %v", configValue["modified"])
}

// Verify unchanged field remains the same
if original["unchanged"] != "value" {
t.Errorf("Expected unchanged field to remain 'value', got %v", original["unchanged"])
}
}

func TestParseSearchResponseWithJSONFields(t *testing.T) {
searcher := &Searcher{
mappingConfig: &logs.FieldMappingConfig{
Message: []string{"message"},
},
}

response := Response{
Hits: HitsInfo{
Hits: []SearchHit{
{
ID: "test-id-1",
Source: map[string]any{
"message": "Test log message",
"config@json": `{"environment": "test", "debug": true, "port": 8080}`,
"invalid@json": `{broken json}`,
"normal_field": "regular value",
},
},
},
},
}

ctx := dutyContext.New()
result := searcher.parseSearchResponse(ctx, response)

if len(result.Logs) != 1 {
t.Fatalf("Expected 1 log entry, got %d", len(result.Logs))
}

logEntry := result.Logs[0]

// Verify message was extracted
if logEntry.Message != "Test log message" {
t.Errorf("Expected message 'Test log message', got '%s'", logEntry.Message)
}

// Verify ID was set
if logEntry.ID != "test-id-1" {
t.Errorf("Expected ID 'test-id-1', got '%s'", logEntry.ID)
}

// Check that JSON fields were processed and flattened into labels
expectedLabels := map[string]string{
// From config@json
"config@json.environment": "test",
"config@json.debug": "true",
"config@json.port": "8080",
// Invalid JSON should remain as string
"invalid@json": `{broken json}`,
// Normal field
"normal_field": "regular value",
}

for expectedKey, expectedValue := range expectedLabels {
if actualValue, exists := logEntry.Labels[expectedKey]; !exists {
t.Errorf("Expected label '%s' to exist", expectedKey)
} else if actualValue != expectedValue {
t.Errorf("Expected label '%s' to have value '%s', got '%s'", expectedKey, expectedValue, actualValue)
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Check if the file exists and verify its current content
fd "search_test.go" logs/

Repository: flanksource/duty

Length of output: 119


🏁 Script executed:

# Check test patterns in the same directory
fd "_test.go" logs/opensearch/ | head -5

Repository: flanksource/duty

Length of output: 90


🏁 Script executed:

# Check how other tests in the repo use assertions
rg "gomega.NewWithT" --type go -l | head -10

Repository: flanksource/duty

Length of output: 242


🏁 Script executed:

# Check the actual content of the search_test.go file to verify it matches review
cat -n logs/opensearch/search_test.go | head -50

Repository: flanksource/duty

Length of output: 1689


🏁 Script executed:

# Check one of the existing gomega tests to see the pattern
cat -n rls/payload_test.go | head -40

Repository: flanksource/duty

Length of output: 1369


🏁 Script executed:

# Check if the file imports gomega currently
rg "gomega|reflect" logs/opensearch/search_test.go

Repository: flanksource/duty

Length of output: 117


🏁 Script executed:

# Check more of the actual test file to see full pattern of assertions
cat -n logs/opensearch/search_test.go | tail -100

Repository: flanksource/duty

Length of output: 3428


Switch these tests to Gomega assertions with gomega.NewWithT(t).

The test suite uses t.Errorf/t.Fatalf and reflect.DeepEqual, which violates the repository standards. Update to Gomega assertions per the **/*_test.go coding guidelines.

🔧 Proposed refactor
 import (
-	"reflect"
 	"testing"
 
 	dutyContext "github.com/flanksource/duty/context"
 	"github.com/flanksource/duty/logs"
+	"github.com/onsi/gomega"
 )
 
 func TestPreprocessJSONFields(t *testing.T) {
 	tests := []struct {
@@
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			g := gomega.NewWithT(t)
 			// Make a copy of input to avoid modifying the test case
 			input := make(map[string]any)
 			for k, v := range tt.input {
 				input[k] = v
 			}
 
 			preprocessJSONFields(input)
 
-			if !reflect.DeepEqual(input, tt.expected) {
-				t.Errorf("preprocessJSONFields() got = %v, want %v", input, tt.expected)
-			}
+			g.Expect(input).To(gomega.Equal(tt.expected))
 		})
 	}
 }
 
 func TestPreprocessJSONFieldsModifiesInPlace(t *testing.T) {
+	g := gomega.NewWithT(t)
 	original := map[string]any{
 		"config@json": `{"modified": true}`,
 		"unchanged":   "value",
 	}
@@
 	// Verify the original map was modified
 	configValue, ok := original["config@json"].(map[string]any)
-	if !ok {
-		t.Errorf("Expected config@json to be unmarshalled to map[string]any, got %T", original["config@json"])
-		return
-	}
+	g.Expect(ok).To(gomega.BeTrue())
+	if !ok {
+		return
+	}
 
-	if configValue["modified"] != true {
-		t.Errorf("Expected config@json.modified to be true, got %v", configValue["modified"])
-	}
+	g.Expect(configValue["modified"]).To(gomega.Equal(true))
 
 	// Verify unchanged field remains the same
-	if original["unchanged"] != "value" {
-		t.Errorf("Expected unchanged field to remain 'value', got %v", original["unchanged"])
-	}
+	g.Expect(original["unchanged"]).To(gomega.Equal("value"))
 }
 
 func TestParseSearchResponseWithJSONFields(t *testing.T) {
+	g := gomega.NewWithT(t)
 	searcher := &Searcher{
 		mappingConfig: &logs.FieldMappingConfig{
 			Message: []string{"message"},
 		},
@@
 	ctx := dutyContext.New()
 	result := searcher.parseSearchResponse(ctx, response)
 
-	if len(result.Logs) != 1 {
-		t.Fatalf("Expected 1 log entry, got %d", len(result.Logs))
-	}
+	g.Expect(result.Logs).To(gomega.HaveLen(1))
 
 	logEntry := result.Logs[0]
 
 	// Verify message was extracted
-	if logEntry.Message != "Test log message" {
-		t.Errorf("Expected message 'Test log message', got '%s'", logEntry.Message)
-	}
+	g.Expect(logEntry.Message).To(gomega.Equal("Test log message"))
 
 	// Verify ID was set
-	if logEntry.ID != "test-id-1" {
-		t.Errorf("Expected ID 'test-id-1', got '%s'", logEntry.ID)
-	}
+	g.Expect(logEntry.ID).To(gomega.Equal("test-id-1"))
@@
 	for expectedKey, expectedValue := range expectedLabels {
-		if actualValue, exists := logEntry.Labels[expectedKey]; !exists {
-			t.Errorf("Expected label '%s' to exist", expectedKey)
-		} else if actualValue != expectedValue {
-			t.Errorf("Expected label '%s' to have value '%s', got '%s'", expectedKey, expectedValue, actualValue)
-		}
+		g.Expect(logEntry.Labels).To(gomega.HaveKeyWithValue(expectedKey, expectedValue))
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
package opensearch
import (
"reflect"
"testing"
dutyContext "github.com/flanksource/duty/context"
"github.com/flanksource/duty/logs"
)
func TestPreprocessJSONFields(t *testing.T) {
tests := []struct {
name string
input map[string]any
expected map[string]any
}{
{
name: "valid JSON in @json field",
input: map[string]any{
"message": "test message",
"config@json": `{"key": "value", "number": 42}`,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config@json": map[string]any{"key": "value", "number": float64(42)},
"other_field": "normal value",
},
},
{
name: "invalid JSON in @json field - should remain unchanged",
input: map[string]any{
"message": "test message",
"config@json": `{"key": "value", invalid json}`,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config@json": `{"key": "value", invalid json}`,
"other_field": "normal value",
},
},
{
name: "non-string value in @json field - should remain unchanged",
input: map[string]any{
"message": "test message",
"config@json": 42,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config@json": 42,
"other_field": "normal value",
},
},
{
name: "nested JSON objects",
input: map[string]any{
"complex@json": `{"nested": {"deep": {"value": "test"}}, "array": [1, 2, 3]}`,
},
expected: map[string]any{
"complex@json": map[string]any{
"nested": map[string]any{
"deep": map[string]any{
"value": "test",
},
},
"array": []any{float64(1), float64(2), float64(3)},
},
},
},
{
name: "empty JSON object",
input: map[string]any{
"empty@json": `{}`,
},
expected: map[string]any{
"empty@json": map[string]any{},
},
},
{
name: "empty JSON array",
input: map[string]any{
"empty@json": `[]`,
},
expected: map[string]any{
"empty@json": []any{},
},
},
{
name: "fields without @json - should remain unchanged",
input: map[string]any{
"message": "test message",
"config": `{"key": "value"}`,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config": `{"key": "value"}`,
"other_field": "normal value",
},
},
{
name: "mixed valid and invalid JSON fields",
input: map[string]any{
"valid@json": `{"valid": true}`,
"invalid@json": `{invalid json}`,
},
expected: map[string]any{
"valid@json": map[string]any{"valid": true},
"invalid@json": `{invalid json}`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Make a copy of input to avoid modifying the test case
input := make(map[string]any)
for k, v := range tt.input {
input[k] = v
}
preprocessJSONFields(input)
if !reflect.DeepEqual(input, tt.expected) {
t.Errorf("preprocessJSONFields() got = %v, want %v", input, tt.expected)
}
})
}
}
func TestPreprocessJSONFieldsModifiesInPlace(t *testing.T) {
original := map[string]any{
"config@json": `{"modified": true}`,
"unchanged": "value",
}
preprocessJSONFields(original)
// Verify the original map was modified
configValue, ok := original["config@json"].(map[string]any)
if !ok {
t.Errorf("Expected config@json to be unmarshalled to map[string]any, got %T", original["config@json"])
return
}
if configValue["modified"] != true {
t.Errorf("Expected config@json.modified to be true, got %v", configValue["modified"])
}
// Verify unchanged field remains the same
if original["unchanged"] != "value" {
t.Errorf("Expected unchanged field to remain 'value', got %v", original["unchanged"])
}
}
func TestParseSearchResponseWithJSONFields(t *testing.T) {
searcher := &Searcher{
mappingConfig: &logs.FieldMappingConfig{
Message: []string{"message"},
},
}
response := Response{
Hits: HitsInfo{
Hits: []SearchHit{
{
ID: "test-id-1",
Source: map[string]any{
"message": "Test log message",
"config@json": `{"environment": "test", "debug": true, "port": 8080}`,
"invalid@json": `{broken json}`,
"normal_field": "regular value",
},
},
},
},
}
ctx := dutyContext.New()
result := searcher.parseSearchResponse(ctx, response)
if len(result.Logs) != 1 {
t.Fatalf("Expected 1 log entry, got %d", len(result.Logs))
}
logEntry := result.Logs[0]
// Verify message was extracted
if logEntry.Message != "Test log message" {
t.Errorf("Expected message 'Test log message', got '%s'", logEntry.Message)
}
// Verify ID was set
if logEntry.ID != "test-id-1" {
t.Errorf("Expected ID 'test-id-1', got '%s'", logEntry.ID)
}
// Check that JSON fields were processed and flattened into labels
expectedLabels := map[string]string{
// From config@json
"config@json.environment": "test",
"config@json.debug": "true",
"config@json.port": "8080",
// Invalid JSON should remain as string
"invalid@json": `{broken json}`,
// Normal field
"normal_field": "regular value",
}
for expectedKey, expectedValue := range expectedLabels {
if actualValue, exists := logEntry.Labels[expectedKey]; !exists {
t.Errorf("Expected label '%s' to exist", expectedKey)
} else if actualValue != expectedValue {
t.Errorf("Expected label '%s' to have value '%s', got '%s'", expectedKey, expectedValue, actualValue)
}
}
}
package opensearch
import (
"testing"
dutyContext "github.com/flanksource/duty/context"
"github.com/flanksource/duty/logs"
"github.com/onsi/gomega"
)
func TestPreprocessJSONFields(t *testing.T) {
tests := []struct {
name string
input map[string]any
expected map[string]any
}{
{
name: "valid JSON in `@json` field",
input: map[string]any{
"message": "test message",
"config@json": `{"key": "value", "number": 42}`,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config@json": map[string]any{"key": "value", "number": float64(42)},
"other_field": "normal value",
},
},
{
name: "invalid JSON in `@json` field - should remain unchanged",
input: map[string]any{
"message": "test message",
"config@json": `{"key": "value", invalid json}`,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config@json": `{"key": "value", invalid json}`,
"other_field": "normal value",
},
},
{
name: "non-string value in `@json` field - should remain unchanged",
input: map[string]any{
"message": "test message",
"config@json": 42,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config@json": 42,
"other_field": "normal value",
},
},
{
name: "nested JSON objects",
input: map[string]any{
"complex@json": `{"nested": {"deep": {"value": "test"}}, "array": [1, 2, 3]}`,
},
expected: map[string]any{
"complex@json": map[string]any{
"nested": map[string]any{
"deep": map[string]any{
"value": "test",
},
},
"array": []any{float64(1), float64(2), float64(3)},
},
},
},
{
name: "empty JSON object",
input: map[string]any{
"empty@json": `{}`,
},
expected: map[string]any{
"empty@json": map[string]any{},
},
},
{
name: "empty JSON array",
input: map[string]any{
"empty@json": `[]`,
},
expected: map[string]any{
"empty@json": []any{},
},
},
{
name: "fields without `@json` - should remain unchanged",
input: map[string]any{
"message": "test message",
"config": `{"key": "value"}`,
"other_field": "normal value",
},
expected: map[string]any{
"message": "test message",
"config": `{"key": "value"}`,
"other_field": "normal value",
},
},
{
name: "mixed valid and invalid JSON fields",
input: map[string]any{
"valid@json": `{"valid": true}`,
"invalid@json": `{invalid json}`,
},
expected: map[string]any{
"valid@json": map[string]any{"valid": true},
"invalid@json": `{invalid json}`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewWithT(t)
// Make a copy of input to avoid modifying the test case
input := make(map[string]any)
for k, v := range tt.input {
input[k] = v
}
preprocessJSONFields(input)
g.Expect(input).To(gomega.Equal(tt.expected))
})
}
}
func TestPreprocessJSONFieldsModifiesInPlace(t *testing.T) {
g := gomega.NewWithT(t)
original := map[string]any{
"config@json": `{"modified": true}`,
"unchanged": "value",
}
preprocessJSONFields(original)
// Verify the original map was modified
configValue, ok := original["config@json"].(map[string]any)
g.Expect(ok).To(gomega.BeTrue())
if !ok {
return
}
g.Expect(configValue["modified"]).To(gomega.Equal(true))
// Verify unchanged field remains the same
g.Expect(original["unchanged"]).To(gomega.Equal("value"))
}
func TestParseSearchResponseWithJSONFields(t *testing.T) {
g := gomega.NewWithT(t)
searcher := &Searcher{
mappingConfig: &logs.FieldMappingConfig{
Message: []string{"message"},
},
}
response := Response{
Hits: HitsInfo{
Hits: []SearchHit{
{
ID: "test-id-1",
Source: map[string]any{
"message": "Test log message",
"config@json": `{"environment": "test", "debug": true, "port": 8080}`,
"invalid@json": `{broken json}`,
"normal_field": "regular value",
},
},
},
},
}
ctx := dutyContext.New()
result := searcher.parseSearchResponse(ctx, response)
g.Expect(result.Logs).To(gomega.HaveLen(1))
logEntry := result.Logs[0]
// Verify message was extracted
g.Expect(logEntry.Message).To(gomega.Equal("Test log message"))
// Verify ID was set
g.Expect(logEntry.ID).To(gomega.Equal("test-id-1"))
// Check that JSON fields were processed and flattened into labels
expectedLabels := map[string]string{
// From config@json
"config@json.environment": "test",
"config@json.debug": "true",
"config@json.port": "8080",
// Invalid JSON should remain as string
"invalid@json": `{broken json}`,
// Normal field
"normal_field": "regular value",
}
for expectedKey, expectedValue := range expectedLabels {
g.Expect(logEntry.Labels).To(gomega.HaveKeyWithValue(expectedKey, expectedValue))
}
}
🤖 Prompt for AI Agents
In `@logs/opensearch/search_test.go` around lines 1 - 219, Replace standard
testing assertions in the tests with Gomega: create a g := gomega.NewWithT(t) at
the start of each test (e.g., TestPreprocessJSONFields,
TestPreprocessJSONFieldsModifiesInPlace, TestParseSearchResponseWithJSONFields)
and use g.Expect(...) constructs instead of t.Errorf/t.Fatalf and
reflect.DeepEqual; for equality use g.Expect(value).To(gomega.Equal(expected)),
for existence use g.Expect(ok).To(gomega.BeTrue()) or
g.Expect(found).To(gomega.HaveKeyWithValue(...)), and for nil/non-nil checks use
g.Expect(...).ToNot(gomega.BeNil()) as appropriate when verifying
preprocessJSONFields mutation and searcher.parseSearchResponse outputs. Ensure
you import gomega in the test file.

Comment thread logs/opensearch/types.go
Comment on lines +27 to +35
// ScrollOptions contains configuration for scroll operations
type ScrollOptions struct {
// Size is the number of documents to fetch per scroll request
Size int `json:"size,omitempty"`
// Timeout is how long to keep the scroll context alive
Timeout time.Duration `json:"timeout,omitempty"`
// Enabled determines if scroll should be used automatically for large result sets
Enabled bool `json:"enabled,omitempty"`
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, check if the custom Duration type exists
find . -name "duration.go" -type f | head -20

Repository: flanksource/duty

Length of output: 79


🏁 Script executed:

# Also check the structure of the opensearch types file
ls -la logs/opensearch/ 2>/dev/null || echo "Directory not found"

Repository: flanksource/duty

Length of output: 426


🏁 Script executed:

# Check the current imports and structure of the file being reviewed
head -50 logs/opensearch/types.go

Repository: flanksource/duty

Length of output: 1497


🏁 Script executed:

# Check the custom Duration type implementation
cat types/duration.go

Repository: flanksource/duty

Length of output: 653


🏁 Script executed:

# Check if there are other examples of Duration usage in the opensearch package
grep -n "Duration\|duration" logs/opensearch/types.go

Repository: flanksource/duty

Length of output: 112


🏁 Script executed:

# Look at the full types.go file to understand the complete context
cat logs/opensearch/types.go

Repository: flanksource/duty

Length of output: 3108


Use types.Duration for the Timeout field for human-readable JSON serialization.

The Timeout field currently uses time.Duration, which serializes to nanoseconds as an integer in JSON (e.g., 30000000000 for 30 seconds). The codebase provides a custom types.Duration wrapper in types/duration.go that serializes to human-readable strings via its MarshalJSON method.

♻️ Suggested change
 // ScrollOptions contains configuration for scroll operations
 type ScrollOptions struct {
 	// Size is the number of documents to fetch per scroll request
 	Size int `json:"size,omitempty"`
 	// Timeout is how long to keep the scroll context alive
-	Timeout time.Duration `json:"timeout,omitempty"`
+	Timeout types.Duration `json:"timeout,omitempty"`
 	// Enabled determines if scroll should be used automatically for large result sets
 	Enabled bool `json:"enabled,omitempty"`
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// ScrollOptions contains configuration for scroll operations
type ScrollOptions struct {
// Size is the number of documents to fetch per scroll request
Size int `json:"size,omitempty"`
// Timeout is how long to keep the scroll context alive
Timeout time.Duration `json:"timeout,omitempty"`
// Enabled determines if scroll should be used automatically for large result sets
Enabled bool `json:"enabled,omitempty"`
}
// ScrollOptions contains configuration for scroll operations
type ScrollOptions struct {
// Size is the number of documents to fetch per scroll request
Size int `json:"size,omitempty"`
// Timeout is how long to keep the scroll context alive
Timeout types.Duration `json:"timeout,omitempty"`
// Enabled determines if scroll should be used automatically for large result sets
Enabled bool `json:"enabled,omitempty"`
}
🤖 Prompt for AI Agents
In `@logs/opensearch/types.go` around lines 27 - 35, The Timeout field on
ScrollOptions uses time.Duration which JSON-serializes as an integer; change its
type to the project's types.Duration wrapper so it serializes as a
human-readable string. Update the struct declaration for ScrollOptions to use
types.Duration for the Timeout field and add the appropriate import for the
types package; then audit any code that constructs or reads
ScrollOptions.Timeout and convert between types.Duration and time.Duration where
needed (e.g., use types.Duration(30*time.Second) when setting or access
.Duration when a raw time.Duration is required).

- Fix oops.Code() type assertion in api/http.go
- Replace deprecated option.WithCredentialsJSON with google.CredentialsFromJSON
- Remove duplicate shell_test.go from logs/loki
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@go.mod`:
- Line 419: The replace directive pointing to github.com/clarkmcc/gorm-sqlite in
go.mod is referencing a non-public/unverifiable fork; verify and either (A)
replace it with a public, supported alternative (e.g., remove the replace and
use github.com/glebarez/sqlite or switch to modernc.org/sqlite) or (B) ensure
the fork is publicly accessible and document why it’s required. Update go.mod to
point to the chosen public module and version (or remove the replace), and add a
short justification comment in the repository (README or go.mod comment)
describing the specific bug/workaround that required the fork (reference the
replace line and the original github.com/glebarez/sqlite) so reviewers and
future maintainers can verify the dependency choice.

Comment thread go.mod
// replace github.com/flanksource/commons => ../commons
// replace github.com/flanksource/deps => ../deps

replace github.com/glebarez/sqlite => github.com/clarkmcc/gorm-sqlite v0.0.0-20240426202654-00ed082c0311
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Check git history for any commits mentioning this replacement
git log -p --all -S "clarkmcc/gorm-sqlite" -- go.mod | head -100

Repository: flanksource/duty

Length of output: 42


🏁 Script executed:

# Search for any comments or documentation explaining this replacement
rg "clarkmcc/gorm-sqlite|glebarez/sqlite" -A 3 -B 3 --type-list | head -20
rg "clarkmcc/gorm-sqlite|glebarez/sqlite" -A 3 -B 3

Repository: flanksource/duty

Length of output: 2455


🌐 Web query:

clarkmcc gorm-sqlite fork status maintenance

💡 Result:

I couldn’t find any public GitHub repository/fork named gorm-sqlite under the clarkmcc account via web search (it may be renamed, deleted, or private).

If you’re evaluating “maintenance status” for a GORM SQLite driver, the actively maintained upstream is go-gorm/sqlite. [1]
If your reason for using a fork is CGO-free / pure-Go SQLite, the commonly referenced and actively released fork is glebarez/sqlite (forked from go-gorm/sqlite). [2]

Sources:
[1] https://github.com/go-gorm/sqlite
[2] https://github.com/glebarez/sqlite/releases


🌐 Web query:

glebarez sqlite library current status 2024 2025

💡 Result:

Status (2024–2025) of the “glebarez SQLite” libraries:

  • github.com/glebarez/sqlite (GORM driver): Latest tagged release is v1.11.0 on March 14, 2024 (“GORM 1.25.7 and upstream merge”). No 2025 release is shown. [1]
  • github.com/glebarez/go-sqlite (database/sql driver used underneath): The repo had code updates in 2025, including commits on Feb 14, 2025 (e.g., bumping modernc.org/sqlite and upgrading Go version). [2] However, package trackers indicate the latest tagged release is much older (“almost 2 years ago”), suggesting updates may not be regularly cut as releases. [3]

Practical takeaway: the GORM wrapper (glebarez/sqlite) looks inactive in releases after Mar 2024, while the underlying driver (glebarez/go-sqlite) saw some maintenance in 2025, but release tagging may lag. [1][2][3]

Sources:
[1] (github.com)
[2] (github.com)
[3] (data.code.gouv.fr)

Citations:


🏁 Script executed:

# Search for any issue/PR discussions about this fork or SQLite problems
rg "clarkmcc|sqlite" --type-list
rg "sqlite" -i -C 3 | grep -i "fork\|replace\|issue\|bug\|fix" | head -30

Repository: flanksource/duty

Length of output: 635


🏁 Script executed:

# Check git log for when this replace directive was added
git log --oneline --all -S "clarkmcc/gorm-sqlite" | head -20
git log -p -S "clarkmcc/gorm-sqlite" -- go.mod | head -150

Repository: flanksource/duty

Length of output: 146


🏁 Script executed:

# Check if there are any issues or comments in the codebase mentioning SQLite problems
find . -type f -name "*.md" -o -name "*.go" | xargs grep -l "sqlite" 2>/dev/null | head -10
cat .github/ISSUES.md 2>/dev/null || cat README.md 2>/dev/null | grep -A 5 -B 5 "sqlite\|fork" || echo "No relevant docs found"

Repository: flanksource/duty

Length of output: 220


🌐 Web query:

clarkmcc gorm-sqlite github deleted renamed private

💡 Result:

I can’t find any public GitHub repository named clarkmcc/gorm-sqlite via web search right now, which strongly suggests it’s either deleted, renamed, or made private (any of those would make it disappear from public search/indexing).

What is publicly discoverable is the official GORM SQLite driver at go-gorm/sqlite / gorm.io/driver/sqlite. [1] [2]

How to tell which case it is on your side:

  • Renamed: check your local clone:
    • git remote -v (see the exact old URL)
    • git ls-remote <url> (if it responds, it still exists somewhere under a new owner/name; GitHub often redirects in-browser but not always via tooling)
  • Private: git ls-remote / go get will typically fail unless you’re authenticated (PAT/SSH); GitHub web will show “Not Found” to non-members.
  • Deleted: it will be “Not Found” for everyone and there won’t be redirects.

If you paste the exact URL you have in go.mod (or the git remote -v output), I can tell you what it resolves to and suggest the closest replacement.

Sources:
[1] [2]


🌐 Web query:

glebarez sqlite issues 2024 fork reason CGO

💡 Result:

  • Why glebarez/go-sqlite exists / “fork reason”: it’s a pure-Go SQLite driver that embeds a Go port of SQLite (linking to the cznic/sqlite codebase), specifically to avoid the build/runtime friction of CGO-based SQLite drivers. (github.com)
  • CGO context: the common alternative mattn/go-sqlite3 requires CGO (you need CGO_ENABLED=1 and a C toolchain like gcc). Projects have looked at glebarez/go-sqlite mainly to simplify builds/cross-compiles by dropping that CGO requirement. (github.com)
  • Notable 2024 issue reported for the CGO-free ports (incl. glebarez): a March 25, 2024 report says PRAGMA wal_autocheckpoint doesn’t behave as expected in “no-cgo” ports (mentioned: glebarez and modernc), leading to WAL files growing very large and slow reopen times. (gitlab.com)
  • Maintenance/release signal often mentioned in 2024 discussions: glebarez/go-sqlite shows its “Latest” GitHub release as v1.21.2 (Jul 6, 2023), so some teams evaluating CGO removal in 2024 picked other CGO-free options (commonly modernc.org/sqlite). (github.com)

Citations:


The clarkmcc/gorm-sqlite fork is not publicly accessible and cannot be verified.

The fork does not appear to exist publicly on GitHub (not indexed by web search, likely deleted, private, or renamed), creating an undocumented and opaque dependency. The upstream glebarez/sqlite has not released since v1.11.0 (March 2024) and has a known WAL file handling issue reported in March 2024 affecting CGO-free SQLite implementations.

To maintain transparency and long-term stability:

  • Document why this fork is necessary (the specific issue it solves)
  • If the fork is private, consider evaluating modernc.org/sqlite or another actively maintained CGO-free alternative
  • Verify the fork repository is accessible and will remain available
🤖 Prompt for AI Agents
In `@go.mod` at line 419, The replace directive pointing to
github.com/clarkmcc/gorm-sqlite in go.mod is referencing a
non-public/unverifiable fork; verify and either (A) replace it with a public,
supported alternative (e.g., remove the replace and use
github.com/glebarez/sqlite or switch to modernc.org/sqlite) or (B) ensure the
fork is publicly accessible and document why it’s required. Update go.mod to
point to the chosen public module and version (or remove the replace), and add a
short justification comment in the repository (README or go.mod comment)
describing the specific bug/workaround that required the fork (reference the
replace line and the original github.com/glebarez/sqlite) so reviewers and
future maintainers can verify the dependency choice.

Comment thread .github/workflows/test.yaml Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @.github/workflows/test.yaml:
- Around line 74-79: The workflow reference "uses: flanksource/deps" is missing
a pinned `@ref` which GitHub Actions requires; update the action reference (the
"uses: flanksource/deps" line) to include a specific tag, branch or commit SHA
(e.g., change to "uses: flanksource/deps@<tag-or-sha>") so the action is pinned
and the workflow will run reliably.

Comment thread .github/workflows/test.yaml Outdated
Comment thread .github/workflows/test.yaml Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant