Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [eager_wf_start](eager_wf_start) - Run a workflow using Eager Workflow Start
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [env_config](env_config) - Load client configuration from TOML files with programmatic overrides.
* [external_storage_redis](external_storage_redis) - Store large payloads in Redis using Temporal external storage.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow.
* [langchain](langchain) - Orchestrate workflows for LangChain.
Expand Down
132 changes: 132 additions & 0 deletions external_storage_redis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Redis External Storage

This sample packages a Redis-backed `StorageDriver` implementation for Temporal
external storage.

The code lives in:

* `external_storage_redis/_driver.py` for the `RedisStorageDriver`
* `external_storage_redis/_client.py` for the storage client abstraction
* `external_storage_redis/redis_asyncio.py` for the `redis.asyncio` adapter
* `tests/external_storage_redis/` for unit and worker integration tests

Unlike most samples in this repository, this one is primarily reusable driver
code plus tests rather than a standalone `worker.py` / `starter.py` pair.

## Install Dependencies

From the repository root:

uv sync --group external-storage-redis --group dev

The `external-storage-redis` group installs `redis`, and the `dev` group
installs `fakeredis` for the test suite.

## Using The Driver

```python
import dataclasses

import redis.asyncio as redis
import temporalio.converter
from temporalio.client import Client
from temporalio.converter import ExternalStorage

from external_storage_redis import RedisStorageDriver
from external_storage_redis.redis_asyncio import new_redis_asyncio_client

redis_client = redis.Redis.from_url(
"redis://localhost:6379/0",
decode_responses=False,
)
try:
driver = RedisStorageDriver(
client=new_redis_asyncio_client(redis_client),
key_prefix="temporalio:payloads",
)

client = await Client.connect(
"localhost:7233",
data_converter=dataclasses.replace(
temporalio.converter.default(),
external_storage=ExternalStorage(
drivers=[driver],
payload_size_threshold=256 * 1024,
),
),
)
finally:
await redis_client.aclose()
```

`decode_responses=False` is required because the driver stores serialized
Temporal `Payload` protobuf bytes as Redis values rather than text.

## Driver Behavior

`RedisStorageDriver` accepts these constructor options:

* `driver_name`: defaults to `"redis"`
* `key_prefix`: defaults to `"temporalio:payloads"`
* `ttl`: optional expiration applied only when a key is first inserted
* `max_payload_size`: defaults to 50 MiB

Stored keys are content-addressed using SHA-256 and include Temporal execution
context when it is available. A typical workflow-scoped key looks like:

temporalio:payloads:v0:ns:default:wt:MyWorkflow:wi:my-workflow-id:ri:my-run-id:d:sha256:<hash>

Some behavior to be aware of:

* Any driver used to store payloads must also be configured on the component
that retrieves them.
* The Redis instance must already exist; the driver does not provision it.
* Identical serialized bytes within the same namespace and workflow/activity
scope share the same Redis key.
* Workflow, activity, namespace, and run identifiers are URL-encoded before
being placed into the key.
* Only payloads at or above `ExternalStorage.payload_size_threshold` are
offloaded.
* If `ttl` is set, duplicate stores do not refresh expiration.
* If a payload key is missing at retrieval time, the driver raises a
non-retryable `ApplicationError`.

## Custom Redis Clients

To use a Redis library other than `redis.asyncio`, implement
`RedisStorageDriverClient`:

```python
from datetime import timedelta

from external_storage_redis import RedisStorageDriverClient


class MyRedisClient(RedisStorageDriverClient):
async def get(self, *, key: str) -> bytes | None: ...

async def set_if_absent(
self,
*,
key: str,
data: bytes,
ttl: timedelta | None = None,
) -> bool: ...
```

## Tests

Run the full Redis sample test suite with:

uv run pytest tests/external_storage_redis

Run only the in-memory unit tests with:

uv run pytest tests/external_storage_redis/test_redis.py

The worker integration tests use `WorkflowEnvironment.start_local()` and
`fakeredis`. They do not require a real Redis server, but the first run may
download a Temporal dev-server binary.

Some Temporal dev-server builds disable standalone activity execution. When
that happens, the two standalone-activity integration tests skip automatically.
9 changes: 9 additions & 0 deletions external_storage_redis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Redis storage driver sample for Temporal external storage."""

from external_storage_redis._client import RedisStorageDriverClient
from external_storage_redis._driver import RedisStorageDriver

__all__ = [
"RedisStorageDriverClient",
"RedisStorageDriver",
]
34 changes: 34 additions & 0 deletions external_storage_redis/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Redis storage driver client abstraction."""

from __future__ import annotations

from abc import ABC, abstractmethod
from datetime import timedelta


class RedisStorageDriverClient(ABC):
"""Abstract base class for the Redis operations used by the driver."""

@abstractmethod
async def get(self, *, key: str) -> bytes | None:
"""Return the raw bytes stored for *key*, or ``None`` if absent."""

@abstractmethod
async def set_if_absent(
self,
*,
key: str,
data: bytes,
ttl: timedelta | None = None,
) -> bool:
"""Store *data* under *key* only if the key does not already exist.

Args:
key: Redis key to store.
data: Serialized payload bytes.
ttl: Optional expiration to apply only when the value is inserted.

Returns:
``True`` if the value was inserted, ``False`` if the key already
existed.
"""
Loading
Loading