From 11318eb7f71981ec96fabe98bd9594855ecb4bb1 Mon Sep 17 00:00:00 2001 From: mobilebarn Date: Fri, 27 Mar 2026 22:02:39 +1100 Subject: [PATCH] fix(openclaw-plugin): serialize per-session commits to prevent CONFLICT errors Adds a per-session commit lock (promise chain) in the context engine to serialize overlapping commits from afterTurn, before_reset, and compact fallback paths. The OV server rejects concurrent commits on the same session with CONFLICT; this ensures they run sequentially. --- examples/openclaw-plugin/context-engine.ts | 56 ++++++++++++++-------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/examples/openclaw-plugin/context-engine.ts b/examples/openclaw-plugin/context-engine.ts index dbbf8e960..50d10e397 100644 --- a/examples/openclaw-plugin/context-engine.ts +++ b/examples/openclaw-plugin/context-engine.ts @@ -92,6 +92,8 @@ export type ContextEngineWithSessionMapping = ContextEngine & { resolveOVSession: (sessionKey: string) => Promise; /** Commit (extract + archive) then delete the OV session, so a fresh one is created on next use. */ commitOVSession: (sessionKey: string) => Promise; + /** Serialize a commit operation through the per-session commit lock to prevent CONFLICT errors. */ + commitWithLock: (ovSessionId: string, fn: () => Promise) => Promise; }; type Logger = { @@ -265,19 +267,31 @@ export function createMemoryOpenVikingContextEngine(params: { resolveAgentId, } = params; + // Per-session commit lock to prevent CONFLICT errors from overlapping commits. + const commitLocks = new Map>(); + + async function withCommitLock(sessionId: string, fn: () => Promise): Promise { + const previous = commitLocks.get(sessionId) ?? Promise.resolve(); + const current = previous.then(fn, fn); // run fn after previous settles (success or fail) + commitLocks.set(sessionId, current.then(() => {}, () => {})); // swallow to avoid unhandled rejection on the stored promise + return current; + } + async function doCommitOVSession(sessionKey: string): Promise { - try { - const client = await getClient(); - const agentId = resolveAgentId(sessionKey); - const ovSessionId = mapSessionKeyToOVSessionId(sessionKey); - const commitResult = await client.commitSession(ovSessionId, { wait: true, agentId }); - logger.info( - `openviking: committed OV session for sessionKey=${sessionKey}, ovSessionId=${ovSessionId}, archived=${commitResult.archived ?? false}, memories=${commitResult.memories_extracted ?? 0}, task_id=${commitResult.task_id ?? "none"}`, - ); - await client.deleteSession(ovSessionId, agentId).catch(() => {}); - } catch (err) { - warnOrInfo(logger, `openviking: commit failed for sessionKey=${sessionKey}: ${String(err)}`); - } + const ovSessionId = mapSessionKeyToOVSessionId(sessionKey); + await withCommitLock(ovSessionId, async () => { + try { + const client = await getClient(); + const agentId = resolveAgentId(sessionKey); + const commitResult = await client.commitSession(ovSessionId, { wait: true, agentId }); + logger.info( + `openviking: committed OV session for sessionKey=${sessionKey}, ovSessionId=${ovSessionId}, archived=${commitResult.archived ?? false}, memories=${commitResult.memories_extracted ?? 0}, task_id=${commitResult.task_id ?? "none"}`, + ); + await client.deleteSession(ovSessionId, agentId).catch(() => {}); + } catch (err) { + warnOrInfo(logger, `openviking: commit failed for sessionKey=${sessionKey}: ${String(err)}`); + } + }); } function extractSessionKey(runtimeContext: Record | undefined): string | undefined { @@ -305,6 +319,8 @@ export function createMemoryOpenVikingContextEngine(params: { commitOVSession: doCommitOVSession, + commitWithLock: (ovSessionId: string, fn: () => Promise) => withCommitLock(ovSessionId, fn), + // --- standard ContextEngine methods --- async ingest(): Promise { @@ -368,13 +384,15 @@ export function createMemoryOpenVikingContextEngine(params: { const OVSessionId = sessionKey ? mapSessionKeyToOVSessionId(sessionKey) : afterTurnParams.sessionId; - await client.addSessionMessage(OVSessionId, "user", decision.normalizedText, agentId); - const commitResult = await client.commitSession(OVSessionId, { wait: true, agentId }); - logger.info( - `openviking: committed ${newCount} messages in session=${OVSessionId}, ` + - `archived=${commitResult.archived ?? false}, memories=${commitResult.memories_extracted ?? 0}, ` + - `task_id=${commitResult.task_id ?? "none"} ${toJsonLog({ captured: [trimForLog(turnText, 260)] })}`, - ); + await withCommitLock(OVSessionId, async () => { + await client.addSessionMessage(OVSessionId, "user", decision.normalizedText, agentId); + const commitResult = await client.commitSession(OVSessionId, { wait: true, agentId }); + logger.info( + `openviking: committed ${newCount} messages in session=${OVSessionId}, ` + + `archived=${commitResult.archived ?? false}, memories=${commitResult.memories_extracted ?? 0}, ` + + `task_id=${commitResult.task_id ?? "none"} ${toJsonLog({ captured: [trimForLog(turnText, 260)] })}`, + ); + }); } catch (err) { warnOrInfo(logger, `openviking: auto-capture failed: ${String(err)}`); }