[elastic statestore] extract common logic#49566
Conversation
🤖 GitHub commentsJust comment with:
|
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
Signed-off-by: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com>
52e3e3c to
d58ee07
Compare
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughA new Elasticsearch-backed 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@libbeat/statestore/backend/es/store.go`:
- Around line 172-177: The Remove method currently calls s.base.Remove without
holding the store mutex, which can race with configure/Close that replace or
close s.base; acquire the store mutex (use s.mx.RLock() / s.mx.RUnlock() or
s.mx.Lock()/Unlock() consistent with how configure/Close synchronize) around
accessing s.base in store.Remove so the call to s.base.Remove is protected, and
check for nil s.base under that lock before invoking Remove; keep the existing
waitReady() invocation as-is but ensure the mutex is held while calling
s.base.Remove to prevent races with configure/Close.
In `@x-pack/otel/extension/elasticsearchstorage/extension.go`:
- Around line 51-52: Access currently returns the raw store from es.NewStore
which exposes a baseStore whose Close shuts down e.client and mutates cli/index
without synchronization; instead create and return a wrapper type (e.g.,
elasticStoreWrapper) from elasticStorage.Access that embeds or implements
backend.Store and delegates all operations to the underlying es.NewStore
instance while (1) preventing the wrapper's Close from closing e.client (client
lifetime is owned by elasticStorage), and (2) serializing calls that touch
baseStore.cli/index with a sync.Mutex (or other synchronization) inside the
wrapper to meet backend.Registry thread-safety guarantees; update
elasticStorage.Access to construct this wrapper around es.NewStore(e.ctx,
e.logger, e.client, name) and ensure any store-specific cleanup in Close only
affects wrapper-local resources, not e.client.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7c68a32b-c1be-4506-b27d-2a6b647f6dcb
📒 Files selected for processing (6)
libbeat/statestore/backend/es/base.golibbeat/statestore/backend/es/store.gox-pack/otel/extension/elasticsearchstorage/extension.gox-pack/otel/extension/elasticsearchstorage/extension_test.gox-pack/otel/extension/elasticsearchstorage/factory.gox-pack/otel/extension/elasticsearchstorage/store.go
💤 Files with no reviewable changes (1)
- x-pack/otel/extension/elasticsearchstorage/store.go
ad6e04f to
757dc4b
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@libbeat/statestore/backend/es/base.go`:
- Line 62: Replace use of url.QueryEscape / url.QueryUnescape for document IDs
embedded in request paths with url.PathEscape / url.PathUnescape to avoid
converting '+' to space and corrupting keys; specifically update calls around
the b.cli.Request call that build the path (e.g., the line using
fmt.Sprintf("/%s/%s/%s", b.index, docType, url.QueryEscape(key))) and any other
places in this package that use url.QueryEscape/QueryUnescape for path segments
(used by methods that read, iterate, or delete state IDs such as Each/get/remove
helpers), ensuring you call url.PathEscape when encoding keys into the URL path
and url.PathUnescape when decoding them.
In `@libbeat/statestore/backend/es/store.go`:
- Around line 93-100: SetID currently only updates the transient s.base instance
so a subsequent configure() (which recreates s.base via NewStore(..., s.name))
loses the custom ID; persist the override by adding a store-scoped field (e.g.,
customID or idOverride) and update SetID to set that field under lock and also
call s.base.SetID(id); then modify configure (the code path that calls
NewStore(..., s.name) and replaces s.base) to, after replacing s.base, reapply
the persisted id override (if non-empty) by calling s.base.SetID(s.customID)
under the same mutex. Ensure all accesses use s.mx and apply the same change for
the other similar SetID override occurrence referenced (around lines 203-220).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d41c2297-7d3d-4eb6-aba2-55c58ce0c798
📒 Files selected for processing (3)
libbeat/statestore/backend/es/base.golibbeat/statestore/backend/es/store.gox-pack/otel/extension/elasticsearchstorage/extension.go
🚧 Files skipped from review as they are similar to previous changes (1)
- x-pack/otel/extension/elasticsearchstorage/extension.go
|
Moving it to draft as I'm figuring out a few things |
757dc4b to
cea27bc
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (2)
libbeat/statestore/backend/es/base.go (1)
72-72:⚠️ Potential issue | 🔴 CriticalUse path escaping for document IDs.
Lines 72/175/191 put document IDs into request paths with
url.QueryEscape, and Line 151 decodes_idwithurl.QueryUnescape. That breaks round-tripping for keys containing+, soEachcan return a different key thanSet/Get/Removestored.Minimal fix
- status, data, err := b.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", b.index, docType, url.QueryEscape(key)), "", nil, nil) + status, data, err := b.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", b.index, docType, url.PathEscape(key)), "", nil, nil) @@ - key, err := url.QueryUnescape(sres.ID) + key, err := url.PathUnescape(sres.ID) @@ - _, _, err := b.cli.Request("PUT", fmt.Sprintf("/%s/%s/%s", b.index, docType, url.QueryEscape(key)), "", nil, doc) + _, _, err := b.cli.Request("PUT", fmt.Sprintf("/%s/%s/%s", b.index, docType, url.PathEscape(key)), "", nil, doc) @@ - _, _, err := b.cli.Delete(b.index, docType, url.QueryEscape(key), nil) + _, _, err := b.cli.Delete(b.index, docType, url.PathEscape(key), nil)Also applies to: 151-151, 175-175, 191-191
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libbeat/statestore/backend/es/base.go` at line 72, The code uses url.QueryEscape for document IDs in b.cli.Request calls (e.g., the GET at b.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", b.index, docType, url.QueryEscape(key))...)) and url.QueryUnescape when decoding _id, which breaks round-tripping for keys with '+'. Replace url.QueryEscape with url.PathEscape when constructing request paths for document IDs and replace url.QueryUnescape with url.PathUnescape when decoding _id so IDs are escaped/unescaped using path semantics; update all occurrences (the GET call, the other b.cli.Request usages and the _id decode) accordingly.libbeat/statestore/backend/es/store.go (1)
42-57:⚠️ Potential issue | 🔴 CriticalPersist the
SetIDoverride across reconfigure.Line 94 returns before recording the override, and Line 219 recreates
s.basefroms.name. After any reconnect or reload, the store silently falls back to the default index, which can split state across indices and make existing state look lost.Minimal fix
type store struct { ctx context.Context cn context.CancelFunc log *logp.Logger name string notifier *Notifier @@ mx sync.Mutex cli *eslegclient.Connection cliErr error + id string base *baseStore } @@ func (s *store) SetID(id string) { + s.mx.Lock() + s.id = id + s.mx.Unlock() + if err := s.waitReady(); err != nil { return } s.mx.Lock() defer s.mx.Unlock() - s.base.SetID(id) + s.base.SetID(s.id) } @@ } else { s.base = NewStore(ctx, s.log, cli, s.name) + if s.id != "" { + s.base.SetID(s.id) + } s.cli = cli }Also applies to: 93-100, 203-220
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libbeat/statestore/backend/es/store.go` around lines 42 - 57, The store currently applies a SetID override but doesn't persist it, so when s.base is recreated (e.g., during reconfigure/reconnect where s.base is reinitialized from s.name) the override is lost and the index falls back to the default; to fix, persist the override on the store object and reuse it whenever s.base is (re)created: record the override before any early return in the SetID path (reference SetID and the code path that currently returns early), add a field on store to hold the override (or update baseStore construction to accept the override), and update the logic that recreates s.base (the reconfigure/reconnect code that currently recreates s.base from s.name) to apply the persisted override instead of deriving it only from s.name. Ensure all places that rebuild s.base (reconnect, reload) consult the persisted override field.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@libbeat/statestore/backend/es/base.go`:
- Line 72: The code uses url.QueryEscape for document IDs in b.cli.Request calls
(e.g., the GET at b.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", b.index,
docType, url.QueryEscape(key))...)) and url.QueryUnescape when decoding _id,
which breaks round-tripping for keys with '+'. Replace url.QueryEscape with
url.PathEscape when constructing request paths for document IDs and replace
url.QueryUnescape with url.PathUnescape when decoding _id so IDs are
escaped/unescaped using path semantics; update all occurrences (the GET call,
the other b.cli.Request usages and the _id decode) accordingly.
In `@libbeat/statestore/backend/es/store.go`:
- Around line 42-57: The store currently applies a SetID override but doesn't
persist it, so when s.base is recreated (e.g., during reconfigure/reconnect
where s.base is reinitialized from s.name) the override is lost and the index
falls back to the default; to fix, persist the override on the store object and
reuse it whenever s.base is (re)created: record the override before any early
return in the SetID path (reference SetID and the code path that currently
returns early), add a field on store to hold the override (or update baseStore
construction to accept the override), and update the logic that recreates s.base
(the reconfigure/reconnect code that currently recreates s.base from s.name) to
apply the persisted override instead of deriving it only from s.name. Ensure all
places that rebuild s.base (reconnect, reload) consult the persisted override
field.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 103eb318-c483-4183-b151-eda6b264bf2e
📒 Files selected for processing (3)
libbeat/statestore/backend/es/base.golibbeat/statestore/backend/es/store.gox-pack/otel/extension/elasticsearchstorage/extension.go
🚧 Files skipped from review as they are similar to previous changes (1)
- x-pack/otel/extension/elasticsearchstorage/extension.go
Signed-off-by: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com>
cea27bc to
547a4b4
Compare
belimawr
left a comment
There was a problem hiding this comment.
Overall LGTM.
However in store.go there are some structs and functions like queryResult, entry, doc, and renderRequest are only used in base.go, moving them to base.go would keep things more organised.
Aside from that, could you add some instructions on how to test this PR? I saw your note, but could you list some of those tests?
This PR refactors the code and extracts common logic between
backend/esandextension/elasticsearchstorage. It creates a new store and wraps the existing notifier mechanism around it.Testing
Existing integration tests in filebeat and otel covers this extensively. They're green and data is ingested as expected.
TestHTTPJSONInputReloadUnderElasticAgentWithElasticStateStoreinmanagerv2_test.gotests the state store with filebeat.TestFilebeatOTelHTTPJSONInputWithElasticStateStoreinotel_test.gotests the state store extension with filebeat receiver.Closes: #49350