Skip to content

[elastic statestore] extract common logic#49566

Merged
VihasMakwana merged 7 commits intoelastic:mainfrom
VihasMakwana:extract-common-logic
Mar 24, 2026
Merged

[elastic statestore] extract common logic#49566
VihasMakwana merged 7 commits intoelastic:mainfrom
VihasMakwana:extract-common-logic

Conversation

@VihasMakwana
Copy link
Contributor

@VihasMakwana VihasMakwana commented Mar 19, 2026

This PR refactors the code and extracts common logic between backend/es and extension/elasticsearchstorage. It creates a new store and wraps the existing notifier mechanism around it.

Testing

Existing integration tests in filebeat and otel covers this extensively. They're green and data is ingested as expected.

  1. TestHTTPJSONInputReloadUnderElasticAgentWithElasticStateStore in managerv2_test.go tests the state store with filebeat.
  2. TestFilebeatOTelHTTPJSONInputWithElasticStateStore in otel_test.go tests the state store extension with filebeat receiver.

Closes: #49350

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Mar 19, 2026
@github-actions
Copy link
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@mergify
Copy link
Contributor

mergify bot commented Mar 19, 2026

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @VihasMakwana? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@VihasMakwana VihasMakwana added Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team skip-changelog labels Mar 19, 2026
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Mar 19, 2026
Signed-off-by: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com>
@VihasMakwana VihasMakwana force-pushed the extract-common-logic branch from 52e3e3c to d58ee07 Compare March 19, 2026 19:31
@VihasMakwana VihasMakwana marked this pull request as ready for review March 20, 2026 08:28
@VihasMakwana VihasMakwana requested a review from a team as a code owner March 20, 2026 08:28
@elasticmachine
Copy link
Contributor

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@coderabbitai
Copy link

coderabbitai bot commented Mar 20, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9dd487ff-d565-4998-95c5-50eaef67f8c8

📥 Commits

Reviewing files that changed from the base of the PR and between 547a4b4 and 1fe41f7.

📒 Files selected for processing (2)
  • libbeat/statestore/backend/es/base.go
  • libbeat/statestore/backend/es/store.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • libbeat/statestore/backend/es/base.go

📝 Walkthrough

Walkthrough

A new Elasticsearch-backed baseStore was added at libbeat/statestore/backend/es/base.go with NewStore and implementations of Get, Has, Set, Remove, Each, Close, and SetID. The libbeat store was refactored to delegate to this baseStore. The x-pack/otel extension now stores the start ctx, uses *logp.Logger, constructs stores via es.NewStore, and the previous x-pack/otel elasticsearchstorage/store.go (and its local ErrKeyUnknown) was removed; tests were updated accordingly.

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed PR successfully extracts common Elasticsearch state store logic into libbeat/statestore/backend/es/base.go, eliminating duplication between backend/es and extension/elasticsearchstorage implementations.
Out of Scope Changes check ✅ Passed All changes directly support extracting common logic. Removed duplicate store implementations and refactored both consumers to use the shared baseStore, with no unrelated modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation: Commit on current branch
  • 🛠️ Update Documentation: Create PR

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@libbeat/statestore/backend/es/store.go`:
- Around line 172-177: The Remove method currently calls s.base.Remove without
holding the store mutex, which can race with configure/Close that replace or
close s.base; acquire the store mutex (use s.mx.RLock() / s.mx.RUnlock() or
s.mx.Lock()/Unlock() consistent with how configure/Close synchronize) around
accessing s.base in store.Remove so the call to s.base.Remove is protected, and
check for nil s.base under that lock before invoking Remove; keep the existing
waitReady() invocation as-is but ensure the mutex is held while calling
s.base.Remove to prevent races with configure/Close.

In `@x-pack/otel/extension/elasticsearchstorage/extension.go`:
- Around line 51-52: Access currently returns the raw store from es.NewStore
which exposes a baseStore whose Close shuts down e.client and mutates cli/index
without synchronization; instead create and return a wrapper type (e.g.,
elasticStoreWrapper) from elasticStorage.Access that embeds or implements
backend.Store and delegates all operations to the underlying es.NewStore
instance while (1) preventing the wrapper's Close from closing e.client (client
lifetime is owned by elasticStorage), and (2) serializing calls that touch
baseStore.cli/index with a sync.Mutex (or other synchronization) inside the
wrapper to meet backend.Registry thread-safety guarantees; update
elasticStorage.Access to construct this wrapper around es.NewStore(e.ctx,
e.logger, e.client, name) and ensure any store-specific cleanup in Close only
affects wrapper-local resources, not e.client.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7c68a32b-c1be-4506-b27d-2a6b647f6dcb

📥 Commits

Reviewing files that changed from the base of the PR and between eff0889 and d58ee07.

📒 Files selected for processing (6)
  • libbeat/statestore/backend/es/base.go
  • libbeat/statestore/backend/es/store.go
  • x-pack/otel/extension/elasticsearchstorage/extension.go
  • x-pack/otel/extension/elasticsearchstorage/extension_test.go
  • x-pack/otel/extension/elasticsearchstorage/factory.go
  • x-pack/otel/extension/elasticsearchstorage/store.go
💤 Files with no reviewable changes (1)
  • x-pack/otel/extension/elasticsearchstorage/store.go

@VihasMakwana VihasMakwana force-pushed the extract-common-logic branch 2 times, most recently from ad6e04f to 757dc4b Compare March 20, 2026 15:25
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@libbeat/statestore/backend/es/base.go`:
- Line 62: Replace use of url.QueryEscape / url.QueryUnescape for document IDs
embedded in request paths with url.PathEscape / url.PathUnescape to avoid
converting '+' to space and corrupting keys; specifically update calls around
the b.cli.Request call that build the path (e.g., the line using
fmt.Sprintf("/%s/%s/%s", b.index, docType, url.QueryEscape(key))) and any other
places in this package that use url.QueryEscape/QueryUnescape for path segments
(used by methods that read, iterate, or delete state IDs such as Each/get/remove
helpers), ensuring you call url.PathEscape when encoding keys into the URL path
and url.PathUnescape when decoding them.

In `@libbeat/statestore/backend/es/store.go`:
- Around line 93-100: SetID currently only updates the transient s.base instance
so a subsequent configure() (which recreates s.base via NewStore(..., s.name))
loses the custom ID; persist the override by adding a store-scoped field (e.g.,
customID or idOverride) and update SetID to set that field under lock and also
call s.base.SetID(id); then modify configure (the code path that calls
NewStore(..., s.name) and replaces s.base) to, after replacing s.base, reapply
the persisted id override (if non-empty) by calling s.base.SetID(s.customID)
under the same mutex. Ensure all accesses use s.mx and apply the same change for
the other similar SetID override occurrence referenced (around lines 203-220).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d41c2297-7d3d-4eb6-aba2-55c58ce0c798

📥 Commits

Reviewing files that changed from the base of the PR and between d58ee07 and ad6e04f.

📒 Files selected for processing (3)
  • libbeat/statestore/backend/es/base.go
  • libbeat/statestore/backend/es/store.go
  • x-pack/otel/extension/elasticsearchstorage/extension.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • x-pack/otel/extension/elasticsearchstorage/extension.go

@VihasMakwana
Copy link
Contributor Author

Moving it to draft as I'm figuring out a few things

@VihasMakwana VihasMakwana marked this pull request as draft March 20, 2026 15:29
@VihasMakwana VihasMakwana force-pushed the extract-common-logic branch from 757dc4b to cea27bc Compare March 20, 2026 15:32
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
libbeat/statestore/backend/es/base.go (1)

72-72: ⚠️ Potential issue | 🔴 Critical

Use path escaping for document IDs.

Lines 72/175/191 put document IDs into request paths with url.QueryEscape, and Line 151 decodes _id with url.QueryUnescape. That breaks round-tripping for keys containing +, so Each can return a different key than Set/Get/Remove stored.

Minimal fix
-	status, data, err := b.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", b.index, docType, url.QueryEscape(key)), "", nil, nil)
+	status, data, err := b.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", b.index, docType, url.PathEscape(key)), "", nil, nil)
@@
-		key, err := url.QueryUnescape(sres.ID)
+		key, err := url.PathUnescape(sres.ID)
@@
-	_, _, err := b.cli.Request("PUT", fmt.Sprintf("/%s/%s/%s", b.index, docType, url.QueryEscape(key)), "", nil, doc)
+	_, _, err := b.cli.Request("PUT", fmt.Sprintf("/%s/%s/%s", b.index, docType, url.PathEscape(key)), "", nil, doc)
@@
-	_, _, err := b.cli.Delete(b.index, docType, url.QueryEscape(key), nil)
+	_, _, err := b.cli.Delete(b.index, docType, url.PathEscape(key), nil)

Also applies to: 151-151, 175-175, 191-191

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libbeat/statestore/backend/es/base.go` at line 72, The code uses
url.QueryEscape for document IDs in b.cli.Request calls (e.g., the GET at
b.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", b.index, docType,
url.QueryEscape(key))...)) and url.QueryUnescape when decoding _id, which breaks
round-tripping for keys with '+'. Replace url.QueryEscape with url.PathEscape
when constructing request paths for document IDs and replace url.QueryUnescape
with url.PathUnescape when decoding _id so IDs are escaped/unescaped using path
semantics; update all occurrences (the GET call, the other b.cli.Request usages
and the _id decode) accordingly.
libbeat/statestore/backend/es/store.go (1)

42-57: ⚠️ Potential issue | 🔴 Critical

Persist the SetID override across reconfigure.

Line 94 returns before recording the override, and Line 219 recreates s.base from s.name. After any reconnect or reload, the store silently falls back to the default index, which can split state across indices and make existing state look lost.

Minimal fix
 type store struct {
 	ctx      context.Context
 	cn       context.CancelFunc
 	log      *logp.Logger
 	name     string
 	notifier *Notifier
@@
 	mx     sync.Mutex
 	cli    *eslegclient.Connection
 	cliErr error
+	id     string
 
 	base *baseStore
 }
@@
 func (s *store) SetID(id string) {
+	s.mx.Lock()
+	s.id = id
+	s.mx.Unlock()
+
 	if err := s.waitReady(); err != nil {
 		return
 	}
 	s.mx.Lock()
 	defer s.mx.Unlock()
 
-	s.base.SetID(id)
+	s.base.SetID(s.id)
 }
@@
 	} else {
 		s.base = NewStore(ctx, s.log, cli, s.name)
+		if s.id != "" {
+			s.base.SetID(s.id)
+		}
 		s.cli = cli
 	}

Also applies to: 93-100, 203-220

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libbeat/statestore/backend/es/store.go` around lines 42 - 57, The store
currently applies a SetID override but doesn't persist it, so when s.base is
recreated (e.g., during reconfigure/reconnect where s.base is reinitialized from
s.name) the override is lost and the index falls back to the default; to fix,
persist the override on the store object and reuse it whenever s.base is
(re)created: record the override before any early return in the SetID path
(reference SetID and the code path that currently returns early), add a field on
store to hold the override (or update baseStore construction to accept the
override), and update the logic that recreates s.base (the reconfigure/reconnect
code that currently recreates s.base from s.name) to apply the persisted
override instead of deriving it only from s.name. Ensure all places that rebuild
s.base (reconnect, reload) consult the persisted override field.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@libbeat/statestore/backend/es/base.go`:
- Line 72: The code uses url.QueryEscape for document IDs in b.cli.Request calls
(e.g., the GET at b.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", b.index,
docType, url.QueryEscape(key))...)) and url.QueryUnescape when decoding _id,
which breaks round-tripping for keys with '+'. Replace url.QueryEscape with
url.PathEscape when constructing request paths for document IDs and replace
url.QueryUnescape with url.PathUnescape when decoding _id so IDs are
escaped/unescaped using path semantics; update all occurrences (the GET call,
the other b.cli.Request usages and the _id decode) accordingly.

In `@libbeat/statestore/backend/es/store.go`:
- Around line 42-57: The store currently applies a SetID override but doesn't
persist it, so when s.base is recreated (e.g., during reconfigure/reconnect
where s.base is reinitialized from s.name) the override is lost and the index
falls back to the default; to fix, persist the override on the store object and
reuse it whenever s.base is (re)created: record the override before any early
return in the SetID path (reference SetID and the code path that currently
returns early), add a field on store to hold the override (or update baseStore
construction to accept the override), and update the logic that recreates s.base
(the reconfigure/reconnect code that currently recreates s.base from s.name) to
apply the persisted override instead of deriving it only from s.name. Ensure all
places that rebuild s.base (reconnect, reload) consult the persisted override
field.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 103eb318-c483-4183-b151-eda6b264bf2e

📥 Commits

Reviewing files that changed from the base of the PR and between ad6e04f and 757dc4b.

📒 Files selected for processing (3)
  • libbeat/statestore/backend/es/base.go
  • libbeat/statestore/backend/es/store.go
  • x-pack/otel/extension/elasticsearchstorage/extension.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • x-pack/otel/extension/elasticsearchstorage/extension.go

Signed-off-by: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com>
@VihasMakwana VihasMakwana force-pushed the extract-common-logic branch from cea27bc to 547a4b4 Compare March 20, 2026 15:36
@VihasMakwana VihasMakwana marked this pull request as ready for review March 23, 2026 11:31
Copy link
Contributor

@belimawr belimawr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

However in store.go there are some structs and functions like queryResult, entry, doc, and renderRequest are only used in base.go, moving them to base.go would keep things more organised.

Aside from that, could you add some instructions on how to test this PR? I saw your note, but could you list some of those tests?

@VihasMakwana VihasMakwana requested a review from belimawr March 24, 2026 06:00
@VihasMakwana VihasMakwana merged commit eab6bd7 into elastic:main Mar 24, 2026
206 of 207 checks passed
@github-actions github-actions bot mentioned this pull request Mar 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-changelog Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Extract common logic between ES state store and ES state store extension

3 participants