Skip to content

Commit 9cbae72

Browse files
committed
Large payload azure blob externalization support
1 parent 84c4291 commit 9cbae72

25 files changed

+3032
-7
lines changed

.github/copilot-instructions.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ pytest
8989
## Project Structure
9090

9191
- `durabletask/` — core SDK source
92+
- `payload/` — public payload externalization API (`PayloadStore` ABC,
93+
`LargePayloadStorageOptions`, helper functions)
94+
- `extensions/azure_blob_payloads/` — Azure Blob Storage payload store
95+
(installed via `pip install durabletask[azure-blob-payloads]`)
96+
- `entities/` — durable entity support
97+
- `testing/` — in-memory backend for testing without a sidecar
98+
- `internal/` — protobuf definitions, gRPC helpers, tracing (not public API)
9299
- `durabletask-azuremanaged/` — Azure managed provider source
93100
- `examples/` — example orchestrations (see `examples/README.md`)
94101
- `tests/` — test suite

.github/workflows/durabletask.yml

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,40 @@ jobs:
4747
steps:
4848
- name: Checkout repository
4949
uses: actions/checkout@v4
50+
5051
- name: Set up Python ${{ matrix.python-version }}
5152
uses: actions/setup-python@v5
5253
with:
5354
python-version: ${{ matrix.python-version }}
55+
56+
- name: Set up Node.js (needed for Azurite)
57+
uses: actions/setup-node@v4
58+
with:
59+
node-version: '20.x'
60+
61+
- name: Cache npm
62+
uses: actions/cache@v3
63+
with:
64+
path: ~/.npm
65+
key: ${{ runner.os }}-npm-azurite
66+
67+
- name: Install Azurite
68+
run: npm install -g azurite
69+
70+
- name: Start Azurite
71+
shell: bash
72+
run: |
73+
azurite-blob --silent --blobPort 10000 &
74+
sleep 2
75+
5476
- name: Install durabletask dependencies and the library itself
5577
run: |
5678
python -m pip install --upgrade pip
5779
pip install flake8 pytest
5880
pip install -r requirements.txt
59-
pip install .
81+
pip install ".[azure-blob-payloads]"
82+
pip install aiohttp
83+
6084
- name: Pytest unit tests
6185
working-directory: tests/durabletask
6286
run: |

.vscode/mcp.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"inputs": [
3+
{
4+
"id": "ado_org",
5+
"type": "promptString",
6+
"description": "msazure"
7+
}
8+
],
9+
"servers": {
10+
"ado": {
11+
"type": "stdio",
12+
"command": "npx",
13+
"args": ["-y", "@azure-devops/mcp", "msazure"]
14+
}
15+
}
16+
}

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
ADDED
1111

12+
- Added large payload externalization support for automatically
13+
offloading oversized orchestration payloads to Azure Blob Storage.
14+
Install with `pip install durabletask[azure-blob-payloads]`.
15+
Pass a `BlobPayloadStore` to the worker and client via the
16+
`payload_store` parameter.
17+
- Added `durabletask.extensions.azure_blob_payloads` extension
18+
package with `BlobPayloadStore` and `BlobPayloadStoreOptions`
19+
- Added `PayloadStore` abstract base class in
20+
`durabletask.payload` for custom storage backends
1221
- Added `durabletask.testing` module with `InMemoryOrchestrationBackend` for testing orchestrations without a sidecar process
1322
- Added `AsyncTaskHubGrpcClient` for asyncio-based applications using `grpc.aio`
1423
- Added `DefaultAsyncClientInterceptorImpl` for async gRPC metadata interceptors

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,20 @@ This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](
1515
- [Development Guide](./docs/development.md)
1616
- [Contributing Guide](./CONTRIBUTING.md)
1717

18+
## Optional Features
19+
20+
### Large Payload Externalization
21+
22+
Install the `azure-blob-payloads` extra to automatically offload
23+
oversized orchestration payloads to Azure Blob Storage:
24+
25+
```bash
26+
pip install durabletask[azure-blob-payloads]
27+
```
28+
29+
See the [feature documentation](./docs/features.md#large-payload-externalization)
30+
and the [example](./examples/large_payload/) for usage details.
31+
1832
## Trademarks
1933
This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft
2034
trademarks or logos is subject to and must follow

docs/features.md

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,151 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and
150150

151151
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
152152

153+
### Large payload externalization
154+
155+
Orchestration inputs, outputs, and event data are transmitted through
156+
gRPC messages. When these payloads become very large they can exceed
157+
gRPC message size limits or degrade performance. Large payload
158+
externalization solves this by transparently offloading oversized
159+
payloads to an external store (such as Azure Blob Storage) and
160+
replacing them with compact reference tokens in the gRPC messages.
161+
162+
This feature is **opt-in** and requires installing an optional
163+
dependency:
164+
165+
```bash
166+
pip install durabletask[azure-blob-payloads]
167+
```
168+
169+
#### How it works
170+
171+
1. When the worker or client sends a payload that exceeds the
172+
configured threshold (default 900 KB), the payload is
173+
compressed (GZip, enabled by default) and uploaded to the
174+
external store.
175+
2. The original payload in the gRPC message is replaced with a
176+
compact token (e.g. `blob:v1:<container>:<blobName>`).
177+
3. When the worker or client receives a message containing a token,
178+
it downloads and decompresses the original payload automatically.
179+
180+
This process is fully transparent to orchestrator and activity code —
181+
no changes are needed in your workflow logic.
182+
183+
#### Configuring the blob payload store
184+
185+
The built-in `BlobPayloadStore` uses Azure Blob Storage. Create a
186+
store instance and pass it to both the worker and client:
187+
188+
```python
189+
from durabletask.extensions.azure_blob_payloads import BlobPayloadStore, BlobPayloadStoreOptions
190+
191+
options = BlobPayloadStoreOptions(
192+
connection_string="DefaultEndpointsProtocol=https;...",
193+
container_name="durabletask-payloads", # default
194+
threshold_bytes=900_000, # default (900 KB)
195+
max_stored_payload_bytes=10_485_760, # default (10 MB)
196+
enable_compression=True, # default
197+
)
198+
store = BlobPayloadStore(options)
199+
```
200+
201+
Then pass the store to the worker and client:
202+
203+
```python
204+
with DurableTaskSchedulerWorker(
205+
host_address=endpoint,
206+
secure_channel=secure_channel,
207+
taskhub=taskhub_name,
208+
token_credential=credential,
209+
payload_store=store,
210+
) as w:
211+
# ... register orchestrators and activities ...
212+
w.start()
213+
214+
c = DurableTaskSchedulerClient(
215+
host_address=endpoint,
216+
secure_channel=secure_channel,
217+
taskhub=taskhub_name,
218+
token_credential=credential,
219+
payload_store=store,
220+
)
221+
```
222+
223+
You can also authenticate using `account_url` and a
224+
`TokenCredential` instead of a connection string:
225+
226+
```python
227+
from azure.identity import DefaultAzureCredential
228+
229+
options = BlobPayloadStoreOptions(
230+
account_url="https://<account>.blob.core.windows.net",
231+
credential=DefaultAzureCredential(),
232+
)
233+
store = BlobPayloadStore(options)
234+
```
235+
236+
#### Configuration options
237+
238+
| Option | Default | Description |
239+
|---|---|---|
240+
| `threshold_bytes` | 900,000 (900 KB) | Payloads larger than this are externalized |
241+
| `max_stored_payload_bytes` | 10,485,760 (10 MB) | Maximum size for externalized payloads |
242+
| `enable_compression` | `True` | GZip-compress payloads before uploading |
243+
| `container_name` | `"durabletask-payloads"` | Azure Blob container name |
244+
| `connection_string` | `None` | Azure Storage connection string |
245+
| `account_url` | `None` | Azure Storage account URL (use with `credential`) |
246+
| `credential` | `None` | `TokenCredential` for token-based auth |
247+
248+
#### Cross-SDK compatibility
249+
250+
The blob token format (`blob:v1:<container>:<blobName>`) is
251+
compatible with the .NET Durable Task SDK, enabling
252+
interoperability between Python and .NET workers sharing the same
253+
task hub and storage account. Note that message serilization strategies
254+
may differ for complex objects and custom types.
255+
256+
#### Custom payload stores
257+
258+
You can implement a custom payload store by subclassing
259+
`PayloadStore` from `durabletask.payload` and implementing
260+
the `upload`, `upload_async`, `download`, `download_async`, and
261+
`is_known_token` methods:
262+
263+
```python
264+
from durabletask.payload import PayloadStore, LargePayloadStorageOptions
265+
266+
267+
class MyPayloadStore(PayloadStore):
268+
269+
def __init__(self, options: LargePayloadStorageOptions):
270+
self._options = options
271+
272+
@property
273+
def options(self) -> LargePayloadStorageOptions:
274+
return self._options
275+
276+
def upload(self, data: bytes) -> str:
277+
# Store data and return a unique token string
278+
...
279+
280+
async def upload_async(self, data: bytes) -> str:
281+
...
282+
283+
def download(self, token: str) -> bytes:
284+
# Retrieve data by token
285+
...
286+
287+
async def download_async(self, token: str) -> bytes:
288+
...
289+
290+
def is_known_token(self, value: str) -> bool:
291+
# Return True if the value looks like a token from this store
292+
...
293+
```
294+
295+
See the [large payload example](../examples/large_payload/) for a
296+
complete working sample.
297+
153298
### Logging configuration
154299

155300
Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources.

0 commit comments

Comments
 (0)