Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 96 additions & 58 deletions go.mod

Large diffs are not rendered by default.

359 changes: 222 additions & 137 deletions go.sum

Large diffs are not rendered by default.

178 changes: 167 additions & 11 deletions logs/opensearch/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,43 @@ package opensearch
import (
"encoding/json"
"io"
"os"
"strconv"
"strings"
"time"

opensearch "github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchtransport"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/logs"
)

type searcher struct {
type Searcher struct {
client *opensearch.Client
config *Backend
mappingConfig *logs.FieldMappingConfig
}

func New(ctx context.Context, backend Backend, mappingConfig *logs.FieldMappingConfig) (*searcher, error) {
type RawClientMixin interface {
GetRawClient() any
}

func (t *Searcher) GetRawClient() *opensearch.Client {
return t.client
}

func New(ctx context.Context, backend Backend, mappingConfig *logs.FieldMappingConfig) (*Searcher, error) {
cfg := opensearch.Config{
Addresses: []string{backend.Address},
}

if ctx.Logger.V(3).Enabled() {
cfg.Logger = &opensearchtransport.ColorLogger{
Output: os.Stderr,
}
}

if backend.Username != nil {
username, err := ctx.GetEnvValueFromCache(*backend.Username, ctx.GetNamespace())
if err != nil {
Expand Down Expand Up @@ -53,14 +70,14 @@ func New(ctx context.Context, backend Backend, mappingConfig *logs.FieldMappingC
return nil, ctx.Oops().Errorf("[opensearch] got ping response: %d", pingResp.StatusCode)
}

return &searcher{
return &Searcher{
client: client,
config: &backend,
mappingConfig: mappingConfig,
}, nil
}

func (t *searcher) Search(ctx context.Context, q Request) (*logs.LogResult, error) {
func (t *Searcher) Search(ctx context.Context, q Request) (*logs.LogResult, error) {
if q.Index == "" {
return nil, ctx.Oops().Errorf("index is empty")
}
Expand Down Expand Up @@ -101,6 +118,148 @@ func (t *searcher) Search(ctx context.Context, q Request) (*logs.LogResult, erro
return nil, ctx.Oops().Wrapf(err, "error parsing the response body")
}

logResult := t.parseSearchResponse(ctx, r)
return logResult, nil
}

var DefaultFieldMappingConfig = logs.FieldMappingConfig{
Message: []string{"message"},
Timestamp: []string{"@timestamp"},
Severity: []string{"log"},
}

// SearchWithScroll initiates a scroll search for large result sets
func (t *Searcher) SearchWithScroll(ctx context.Context, req ScrollRequest) (*logs.LogResult, string, error) {
const defaultScrollSize = 1000
const defaultScrollTimeout = time.Minute

scrollSize := req.Scroll.Size
if scrollSize <= 0 {
scrollSize = defaultScrollSize
}

scrollTimeout := req.Scroll.Timeout
if scrollTimeout == 0 {
scrollTimeout = defaultScrollTimeout
}

if req.Index == "" {
return nil, "", ctx.Oops().Errorf("index is empty")
}

res, err := t.client.Search(
t.client.Search.WithContext(ctx),
t.client.Search.WithIndex(req.Index),
t.client.Search.WithBody(strings.NewReader(req.Query)),
t.client.Search.WithSize(scrollSize),
t.client.Search.WithScroll(scrollTimeout),
t.client.Search.WithErrorTrace(),
)
if err != nil {
return nil, "", ctx.Oops().Wrapf(err, "error initiating scroll search")
}
defer res.Body.Close()

if res.IsError() {
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, "", ctx.Oops().Wrapf(err, "failed to read error response body from opensearch")
}
return nil, "", ctx.Oops().Errorf("opensearch: scroll search failed with status %s: %s", res.Status(), string(body))
}

var r Response
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return nil, "", ctx.Oops().Wrapf(err, "error parsing the scroll response body")
}

logResult := t.parseSearchResponse(ctx, r)
return logResult, r.ScrollID, nil
}

// ScrollNext retrieves the next batch of results using the scroll ID
func (t *Searcher) ScrollNext(ctx context.Context, scrollID string, scrollTimeout time.Duration) (*logs.LogResult, string, error) {
if scrollTimeout == 0 {
scrollTimeout = time.Minute
}

res, err := t.client.Scroll(
t.client.Scroll.WithContext(ctx),
t.client.Scroll.WithScrollID(scrollID),
t.client.Scroll.WithScroll(scrollTimeout),
t.client.Scroll.WithErrorTrace(),
)
if err != nil {
return nil, "", ctx.Oops().Wrapf(err, "error continuing scroll search")
}
defer res.Body.Close()

if res.IsError() {
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, "", ctx.Oops().Wrapf(err, "failed to read error response body from opensearch scroll")
}
return nil, "", ctx.Oops().Errorf("opensearch: scroll next failed with status %s: %s", res.Status(), string(body))
}

var r Response
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return nil, "", ctx.Oops().Wrapf(err, "error parsing the scroll next response body")
}

logResult := t.parseSearchResponse(ctx, r)
return logResult, r.ScrollID, nil
}

// ClearScroll cleans up the scroll context
func (t *Searcher) ClearScroll(ctx context.Context, scrollID string) error {
res, err := t.client.ClearScroll(
t.client.ClearScroll.WithContext(ctx),
t.client.ClearScroll.WithScrollID(scrollID),
)
if err != nil {
return ctx.Oops().Wrapf(err, "error clearing scroll")
}
defer res.Body.Close()

if res.IsError() {
body, err := io.ReadAll(res.Body)
if err != nil {
return ctx.Oops().Wrapf(err, "failed to read error response body from clear scroll")
}
return ctx.Oops().Errorf("opensearch: clear scroll failed with status %s: %s", res.Status(), string(body))
}

return nil
}

// preprocessJSONFields attempts to unmarshal JSON from fields ending with @json
// It modifies the input map in place, replacing string values with unmarshalled JSON where possible
func preprocessJSONFields(source map[string]any) {
for key, value := range source {
// Check if field name ends with @json
if !strings.HasSuffix(key, "@json") {
continue
}

// Only attempt to unmarshal string values
strValue, ok := value.(string)
if !ok {
continue
}

// Attempt to unmarshal the JSON string
var jsonValue any
if err := json.Unmarshal([]byte(strValue), &jsonValue); err == nil {
// Successfully unmarshalled, replace the value
source[key] = jsonValue
}
// On error, leave the original string value unchanged (treat as text)
}
}
Comment on lines +236 to +259
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Implementation only handles @json suffix, but tests expect @input too.

The function comment mentions "fields ending with @JSON" but the test file includes test cases for @input suffix fields (e.g., payload@input, metadata@input). The implementation at line 241 only checks for @json.

-// preprocessJSONFields attempts to unmarshal JSON from fields ending with @json
+// preprocessJSONFields attempts to unmarshal JSON from fields ending with @json or @input
 // It modifies the input map in place, replacing string values with unmarshalled JSON where possible
 func preprocessJSONFields(source map[string]any) {
 	for key, value := range source {
-		// Check if field name ends with @json
-		if !strings.HasSuffix(key, "@json") {
+		// Check if field name ends with @json or @input
+		if !strings.HasSuffix(key, "@json") && !strings.HasSuffix(key, "@input") {
 			continue
 		}
🤖 Prompt for AI Agents
In logs/opensearch/search.go around lines 236 to 259, the function only checks
for the "@json" suffix but tests expect it to treat fields with "@input" the
same way; update the suffix check to handle both "@json" and "@input" (e.g., if
strings.HasSuffix(key, "@json") || strings.HasSuffix(key, "@input")), keep the
same behavior of only unmarshalling string values and replacing them when
json.Unmarshal succeeds, and update the function comment to document that both
"@json" and "@input" suffixes are processed.


// parseSearchResponse extracts log lines from search response
func (t *Searcher) parseSearchResponse(ctx context.Context, r Response) *logs.LogResult {
var logResult = logs.LogResult{}
logResult.Logs = make([]*logs.LogLine, 0, len(r.Hits.Hits))

Expand All @@ -115,6 +274,9 @@ func (t *searcher) Search(ctx context.Context, q Request) (*logs.LogResult, erro
Count: 1,
}

// Preprocess JSON fields to unmarshal @json and @input suffixed fields
preprocessJSONFields(hit.Source)

for k, v := range hit.Source {
if err := logs.MapFieldToLogLine(k, v, line, mappingConfig); err != nil {
// Log or handle mapping error? For now, just log it.
Expand All @@ -126,11 +288,5 @@ func (t *searcher) Search(ctx context.Context, q Request) (*logs.LogResult, erro
logResult.Logs = append(logResult.Logs, line)
}

return &logResult, nil
}

var DefaultFieldMappingConfig = logs.FieldMappingConfig{
Message: []string{"message"},
Timestamp: []string{"@timestamp"},
Severity: []string{"log"},
return &logResult
}
Loading
Loading