Skip to content

Commit eb846bf

Browse files
affandarCopilot
andcommitted
feat: KV store support (duroxide 0.1.24)
- OrchestrationContext: set_value, get_value, clear_value, clear_all_values, get_value_from_instance - Client: get_value, wait_for_value - Constants: MAX_KV_KEYS, MAX_KV_VALUE_BYTES - 2 KV e2e tests (request/response, cross-orchestration read) - Updated README, CHANGELOG, user-guide, architecture docs - Remove [patch.crates-io] — using published duroxide-pg 0.1.25 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 2094473 commit eb846bf

12 files changed

Lines changed: 380 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.1.14] - 2026-03-13
9+
10+
- **KV store support:** Durable key-value store for per-instance state
11+
- `OrchestrationContext`: `set_value()`, `get_value()`, `clear_value()`, `clear_all_values()`
12+
- `Client`: `get_value()`, `wait_for_value()`
13+
- Constants: `MAX_KV_KEYS`, `MAX_KV_VALUE_BYTES`
14+
- Added KV e2e tests (`test_kv_store.py`)
15+
- Bumped duroxide to 0.1.24, duroxide-pg to 0.1.25
16+
817
## [0.1.13] - 2026-03-08
918

1019
### Added

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
[package]
22
name = "duroxide-python"
3-
version = "0.1.13"
3+
version = "0.1.14"
44
edition = "2021"
55

66
[lib]
77
name = "duroxide_python"
88
crate-type = ["cdylib"]
99

1010
[dependencies]
11-
duroxide = { version = "0.1.23", features = ["sqlite"] }
12-
duroxide-pg = { version = "0.1.24" }
11+
duroxide = { version = "0.1.24", features = ["sqlite"] }
12+
duroxide-pg = { version = "0.1.25" }
1313
pyo3 = { version = "0.23", features = ["extension-module"] }
1414
async-trait = "0.1"
1515
tokio = { version = "1", features = ["full"] }

README.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ Write durable workflows as Python generators. The Rust runtime handles replay, p
1616
- **Deterministic replay** — safe resume after crashes
1717
- **SQLite & PostgreSQL** — pluggable storage providers
1818
- **Custom Status**`ctx.set_custom_status()` / `ctx.reset_custom_status()` for orchestration progress reporting, `client.wait_for_status_change()` for efficient polling
19+
- **KV Store** — durable per-instance state via `ctx.set_value()` / `ctx.get_value()` / `ctx.clear_value()` / `ctx.clear_all_values()`, plus `client.get_value()` / `client.wait_for_value()`
1920
- **Event Queues**`ctx.dequeue_event(queue_name)` for FIFO mailbox-style message passing, `client.enqueue_event()` to send messages
2021
- **Retry on Session**`ctx.schedule_activity_with_retry_on_session()` for retry with session affinity
21-
- **Tag Routing** — worker tags for activity affinity (`MAX_WORKER_TAGS=5`, `MAX_TAG_NAME_BYTES=256`)
22+
- **Tag Routing** — worker tags for activity affinity (`MAX_WORKER_TAGS=5`, `MAX_TAG_NAME_BYTES=256`, `MAX_KV_KEYS=10`, `MAX_KV_VALUE_BYTES=16384`)
2223
- **Admin APIs** — instance management, metrics, pruning
2324
- **Activity client access**`ctx.get_client()` lets activities start new orchestrations
2425
- **Runtime metrics**`metrics_snapshot()` for orchestration/activity counters
@@ -178,6 +179,25 @@ if status:
178179
print(status.custom_status_version) # monotonically increasing counter
179180
```
180181

182+
## KV Store
183+
184+
Durable per-instance key-value state for orchestration coordination and request/response patterns:
185+
186+
```python
187+
@runtime.register_orchestration("KvWorkflow")
188+
def kv_workflow(ctx, input):
189+
ctx.set_value("status", "running")
190+
result = yield ctx.schedule_activity("Compute", input)
191+
ctx.set_value("result", str(result))
192+
return result
193+
194+
# External reads
195+
status = client.wait_for_value("instance-1", "status", 10000)
196+
result = client.get_value("instance-1", "result")
197+
```
198+
199+
KV entries are scoped to a single orchestration instance and remain readable after completion until the instance is deleted or pruned.
200+
181201
## Event Queues
182202

183203
Persistent FIFO message passing between clients and orchestrations:

docs/architecture.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,38 @@ client.wait_for_status_change(id, last_version, poll_ms, timeout_ms)
352352

353353
The `custom_status_version` is a monotonically increasing counter incremented on every `set_custom_status()` call. Clients pass their last-seen version to avoid redundant reads.
354354

355+
## KV Store Data Path
356+
357+
KV operations are fire-and-forget orchestration context calls backed by provider state for the current instance.
358+
359+
```
360+
Python Rust (PyO3) Provider (DB)
361+
────── ─────────── ─────────────
362+
ctx.set_value("status", "ready")
363+
364+
└─► orchestration_set_value(instance_id, "status", "ready")
365+
366+
└─► ORCHESTRATION_CTXS.get(instance_id)
367+
368+
└─► ctx.set_value("status", "ready")
369+
370+
└─► provider.upsert KV row for (instance_id, key)
371+
372+
client.get_value(id, "status")
373+
374+
└─► py.allow_threads(|| TOKIO_RT.block_on(client.get_value(id, "status")))
375+
376+
└─► provider.get_value(id, "status")
377+
378+
client.wait_for_value(id, "status", timeout_ms)
379+
380+
└─► py.allow_threads(|| TOKIO_RT.block_on(client.wait_for_value(...)))
381+
382+
└─► repeated provider.get_value(...) polling until key exists or timeout
383+
```
384+
385+
Cross-orchestration reads use the built-in `__duroxide_syscall:get_kv_value` activity. In Python this is exposed as `ctx.get_value_from_instance(instance_id, key)` and replays like any other scheduled activity.
386+
355387
## Event Queue Data Flow
356388

357389
Event queues provide persistent FIFO message passing between external clients and orchestrations. Unlike `wait_for_event()` which matches a single named event, event queues support multiple messages on named queues with guaranteed ordering. Messages survive `continue_as_new`.

docs/user-guide.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,38 @@ while True:
380380
- `result.custom_status` — the custom status string, or `None` if not set
381381
- `result.custom_status_version` — monotonically increasing version counter
382382

383+
## KV Store — Durable Per-Instance State
384+
385+
KV entries are durable metadata scoped to a single orchestration instance. They can be updated from inside the orchestration without yielding and read externally through the client.
386+
387+
```python
388+
@runtime.register_orchestration("RequestServer")
389+
def request_server(ctx, input):
390+
ctx.set_value("status", "ready")
391+
request = yield ctx.wait_for_event("request")
392+
response = request["command"][::-1]
393+
ctx.set_value(f"response:{request['op_id']}", response)
394+
return "done"
395+
396+
status = client.wait_for_value("server-1", "status", 10000)
397+
response = client.get_value("server-1", "response:op-1")
398+
```
399+
400+
**OrchestrationContext methods:**
401+
- `ctx.set_value(key, value)` — set or overwrite a key
402+
- `ctx.get_value(key)` — read the current value for a key in the active instance
403+
- `ctx.clear_value(key)` — remove a single key
404+
- `ctx.clear_all_values()` — clear all keys for the active instance
405+
- `ctx.get_value_from_instance(instance_id, key)` — read another instance's KV via the built-in syscall activity
406+
407+
**Client methods:**
408+
- `client.get_value(instance_id, key)` — read a key immediately
409+
- `client.wait_for_value(instance_id, key, timeout_ms)` — block until the key exists or timeout
410+
411+
**Limits:**
412+
- `MAX_KV_KEYS = 10`
413+
- `MAX_KV_VALUE_BYTES = 16384`
414+
383415
## Event Queues — Persistent FIFO Message Passing
384416

385417
Event queues provide durable, ordered message passing between external clients and orchestrations. Unlike `wait_for_event()` which waits for a single named event, event queues support FIFO ordering with multiple messages on named queues. Messages survive `continue_as_new`.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "maturin"
44

55
[project]
66
name = "duroxide"
7-
version = "0.1.13"
7+
version = "0.1.14"
88
description = "Python SDK for the Duroxide durable execution runtime"
99
readme = "README.md"
1010
license = { text = "MIT" }

python/duroxide/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,21 @@ def wait_for_status_change(
183183
)
184184
return _parse_status(result)
185185

186+
def get_value(self, instance_id: str, key: str) -> "Optional[str]":
187+
"""Read a single KV entry for the given instance."""
188+
return self._native.get_value(instance_id, key)
189+
190+
def wait_for_value(self, instance_id: str, key: str, timeout_ms: int = 30000) -> str:
191+
"""Wait for a KV value to be set on an instance."""
192+
try:
193+
return self._native.wait_for_value(instance_id, key, timeout_ms)
194+
except RuntimeError as exc:
195+
if "timed out" in str(exc).lower():
196+
raise TimeoutError(
197+
f"Timed out waiting for value '{key}' on instance '{instance_id}'"
198+
) from exc
199+
raise
200+
186201
def cancel_instance(self, instance_id: str, reason: str = None):
187202
self._native.cancel_instance(instance_id, reason)
188203

@@ -424,6 +439,8 @@ def metrics_snapshot(self):
424439
# Tag limits
425440
MAX_WORKER_TAGS = 5
426441
MAX_TAG_NAME_BYTES = 256
442+
MAX_KV_KEYS = 10
443+
MAX_KV_VALUE_BYTES = 16384
427444

428445

429446
class TagFilter:
@@ -478,4 +495,6 @@ def default_and(tags: list) -> str:
478495
"parse_result",
479496
"MAX_WORKER_TAGS",
480497
"MAX_TAG_NAME_BYTES",
498+
"MAX_KV_KEYS",
499+
"MAX_KV_VALUE_BYTES",
481500
]

python/duroxide/context.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
orchestration_set_custom_status,
1717
orchestration_reset_custom_status,
1818
orchestration_get_custom_status,
19+
orchestration_set_value,
20+
orchestration_get_value,
21+
orchestration_clear_value,
22+
orchestration_clear_all_values,
1923
activity_trace_log,
2024
activity_is_cancelled,
2125
activity_tag,
@@ -455,6 +459,30 @@ def get_custom_status(self) -> Optional[str]:
455459
"""
456460
return orchestration_get_custom_status(self.instance_id)
457461

462+
def set_value(self, key: str, value: str):
463+
"""Set a key-value pair scoped to this orchestration instance."""
464+
orchestration_set_value(self.instance_id, str(key), str(value))
465+
466+
def get_value(self, key: str) -> Optional[str]:
467+
"""Get the current value for a key. Returns None if not set."""
468+
return orchestration_get_value(self.instance_id, str(key))
469+
470+
def clear_value(self, key: str):
471+
"""Remove a single key from the KV store."""
472+
orchestration_clear_value(self.instance_id, str(key))
473+
474+
def clear_all_values(self):
475+
"""Clear ALL key-value pairs for this orchestration instance."""
476+
orchestration_clear_all_values(self.instance_id)
477+
478+
def get_value_from_instance(self, instance_id: str, key: str) -> ScheduledTask:
479+
"""Read a KV value from another orchestration instance via the built-in syscall activity."""
480+
return ScheduledTask({
481+
"type": "activity",
482+
"name": "__duroxide_syscall:get_kv_value",
483+
"input": json.dumps({"instance_id": instance_id, "key": key}),
484+
})
485+
458486
# ─── Logging (fire-and-forget, delegates to Rust ctx.trace()) ───
459487

460488
def trace_info(self, message: str):

src/client.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,41 @@ impl PyClient {
271271
.map_err(|e: String| pyo3::exceptions::PyRuntimeError::new_err(e))
272272
}
273273

274+
/// Read a single KV entry for an orchestration instance.
275+
fn get_value(
276+
&self,
277+
py: Python<'_>,
278+
instance_id: String,
279+
key: String,
280+
) -> PyResult<Option<String>> {
281+
let client = self.inner.clone();
282+
py.allow_threads(|| {
283+
TOKIO_RT.block_on(async { client.get_value(&instance_id, &key).await.map_err(|e| format!("{e}")) })
284+
})
285+
.map_err(|e: String| pyo3::exceptions::PyRuntimeError::new_err(e))
286+
}
287+
288+
/// Wait for a KV entry to become available on an orchestration instance.
289+
fn wait_for_value(
290+
&self,
291+
py: Python<'_>,
292+
instance_id: String,
293+
key: String,
294+
timeout_ms: u64,
295+
) -> PyResult<String> {
296+
let client = self.inner.clone();
297+
let timeout = Duration::from_millis(timeout_ms);
298+
py.allow_threads(|| {
299+
TOKIO_RT.block_on(async {
300+
client
301+
.wait_for_value(&instance_id, &key, timeout)
302+
.await
303+
.map_err(|e| format!("{e}"))
304+
})
305+
})
306+
.map_err(|e: String| pyo3::exceptions::PyRuntimeError::new_err(e))
307+
}
308+
274309
/// Get system metrics (if provider supports management).
275310
fn get_system_metrics(&self, py: Python<'_>) -> PyResult<PySystemMetrics> {
276311
let client = self.inner.clone();

src/handlers.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,36 @@ pub fn orchestration_get_custom_status(instance_id: &str) -> Option<String> {
139139
map.get(instance_id).and_then(|ctx| ctx.get_custom_status())
140140
}
141141

142+
/// Called from Python to set a KV value on the OrchestrationContext.
143+
pub fn orchestration_set_value(instance_id: &str, key: &str, value: &str) {
144+
let map = ORCHESTRATION_CTXS.lock();
145+
if let Some(ctx) = map.get(instance_id) {
146+
ctx.set_value(key, value);
147+
}
148+
}
149+
150+
/// Called from Python to read a KV value from the OrchestrationContext.
151+
pub fn orchestration_get_value(instance_id: &str, key: &str) -> Option<String> {
152+
let map = ORCHESTRATION_CTXS.lock();
153+
map.get(instance_id).and_then(|ctx| ctx.get_value(key))
154+
}
155+
156+
/// Called from Python to clear a single KV value on the OrchestrationContext.
157+
pub fn orchestration_clear_value(instance_id: &str, key: &str) {
158+
let map = ORCHESTRATION_CTXS.lock();
159+
if let Some(ctx) = map.get(instance_id) {
160+
ctx.clear_value(key);
161+
}
162+
}
163+
164+
/// Called from Python to clear all KV values on the OrchestrationContext.
165+
pub fn orchestration_clear_all_values(instance_id: &str) {
166+
let map = ORCHESTRATION_CTXS.lock();
167+
if let Some(ctx) = map.get(instance_id) {
168+
ctx.clear_all_values();
169+
}
170+
}
171+
142172
// ─── Activity Bridge ─────────────────────────────────────────────
143173

144174
/// Wraps a Python callable as a Rust ActivityHandler.

0 commit comments

Comments
 (0)