Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
cd styx-package && python setup.py build_ext --inplace && cd ..
python -m pip install styx-package/
python -m pip install -r coordinator/requirements.txt
python -m pip install pytest==9.0.2 pytest-cov pytest-asyncio
python -m pip install pytest==9.0.3 pytest-cov pytest-asyncio

- name: Run coordinator unit tests
run: pytest tests/unit/coordinator/ -v --cov=coordinator --cov-report=xml:coverage-coordinator.xml
Expand All @@ -43,6 +43,7 @@ jobs:
flags: coordinator
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
use_pypi: true

unit-tests-worker:
name: Unit tests (worker)
Expand All @@ -63,7 +64,7 @@ jobs:
python -m pip install --upgrade pip setuptools Cython
cd styx-package && python setup.py build_ext --inplace && cd ..
python -m pip install styx-package/
python -m pip install msgspec pytest==9.0.2 pytest-cov pytest-asyncio psutil
python -m pip install msgspec pytest==9.0.3 pytest-cov pytest-asyncio psutil

- name: Build Cython extensions
run: python worker/setup.py build_ext --inplace
Expand All @@ -78,6 +79,7 @@ jobs:
flags: worker
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
use_pypi: true

unit-tests-styx-package:
name: Unit tests (styx-package)
Expand All @@ -98,7 +100,7 @@ jobs:
python -m pip install --upgrade pip setuptools Cython
cd styx-package && python setup.py build_ext --inplace && cd ..
python -m pip install -e styx-package/
python -m pip install pytest==9.0.2 pytest-cov pytest-asyncio
python -m pip install pytest==9.0.3 pytest-cov pytest-asyncio

- name: Run styx-package unit tests
run: pytest tests/unit/styx_package/ -v --cov=styx-package/styx --cov-report=xml:coverage-styx-package.xml
Expand All @@ -110,6 +112,7 @@ jobs:
flags: styx-package
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
use_pypi: true

integration-tests:
name: Integration tests
Expand All @@ -132,7 +135,7 @@ jobs:
cd styx-package && python setup.py build_ext --inplace && cd ..
python -m pip install styx-package/
python worker/setup.py build_ext --inplace
python -m pip install pytest==9.0.2 pytest-asyncio pytest-cov "testcontainers[kafka,minio]>=4.14.1"
python -m pip install pytest==9.0.3 pytest-asyncio pytest-cov "testcontainers[kafka,minio]>=4.14.2"

- name: Run integration tests
run: pytest tests/integration/ -v --cov=styx-package/styx --cov=coordinator --cov=worker --cov-report=xml:coverage-integration.xml
Expand All @@ -144,3 +147,4 @@ jobs:
flags: integration
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
use_pypi: true
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__pycache__/
*.py[cod]
*$py.class
.DS_Store

# C extensions
*.so
Expand Down
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ pip install -r requirements.txt
* [`coordinator`](https://github.com/delftdata/styx/tree/main/coordinator)
Styx coordinator.

* [`demo`](https://github.com/delftdata/styx/tree/main/benchmark)
* [`demo`](https://github.com/delftdata/styx/tree/main/demo)
The YCSB-T, Deathstar, TPC-C and scalability benchmarks we used for the experiments.

* [`grafana`](https://github.com/delftdata/styx/tree/main/grafana)
The confinguration files for the deployment of our visualization dashboards.
The configuration files for the deployment of our visualization dashboards.

* [`styx-package`](https://github.com/delftdata/styx/tree/main/styx-package)
The Styx framework Python package.

* [`tests`](https://github.com/delftdata/styx/tree/main/tests)
Tests for the worker components of Styx.

* [`worker`](https://github.com/delftdata/styx/tree/main/styx-package)
* [`worker`](https://github.com/delftdata/styx/tree/main/worker)
Styx worker.

## Container images
Expand Down Expand Up @@ -162,19 +162,23 @@ To clear kafka: `docker compose -f docker-compose-kafka.yml down --volumes`

### S3 (Rustfs)

To run self-hosted S3: `docker-compose up -f docker-compose-s3.yml up`
To run self-hosted S3: `docker compose -f docker-compose-s3.yml up`

To clear self-hosted S3: `docker-compose -f docker-compose-s3.yml down --volumes`
To clear self-hosted S3: `docker compose -f docker-compose-s3.yml down --volumes`

---

Then, you can start the Styx engine and specify the desired scale.

### Styx Engine

To run the SE: `docker-compose up --build --scale worker=4`
To run the SE: `docker compose up --build --scale worker=4`

To clear the SE: `docker-compose down --volumes`
The worker service in `docker-compose.yml` sets `INGRESS_TYPE=KAFKA`. If you
start workers manually outside Docker Compose, set `INGRESS_TYPE=KAFKA` together
with the Kafka, coordinator, and S3-compatible storage environment variables.

To clear the SE: `docker compose down --volumes`

##### Cite Styx

Expand Down
166 changes: 80 additions & 86 deletions docs/styx-docs/docker-env-variables.md
Original file line number Diff line number Diff line change
@@ -1,119 +1,113 @@
# Environment Variables

## Styx Coordinator
This page lists the environment variables read by Styx runtime components. Many values are read when Python modules are imported, so set them before starting the coordinator, workers, clients, or tests.

The following environment variables configure the behavior of the **Styx Coordinator**, including heartbeats, Kafka settings, snapshotting, and object storage.
Boolean values are parsed with Python's `strtobool`, so values such as `true`, `false`, `1`, `0`, `yes`, and `no` are accepted.

---
Styx supports S3-compatible object storages for each snapshots. The local Docker Compose setup uses
RustFS, but any compatible endpoint can be used with the `S3_*` variables.

### 🧭 Core Configuration
## Required Runtime Config

| Variable | Default Value | Description |
|---------------------------|-------------------------|---------------------------------------------|
| `KAFKA_URL` | `KAFKA_HOST:KAFKA_PORT` | Kafka bootstrap server for messaging |
| `HEARTBEAT_LIMIT` | `5000` (ms) | Max time before a worker is considered dead |
| `HEARTBEAT_CHECK_INTERVAL` | `500` (ms) | How often to check worker heartbeats |
| `MAX_OPERATOR_PARALLELISM` | `10` | Max number of operator partitions |
| `PROTOCOL` | `Protocols.Aria` | Transaction execution protocol used by Styx |
These variables are required for a usual Kafka-backed Styx deployment.

---
| Variable | Component | Default | Description |
|----------|-----------|---------|-------------|
| `KAFKA_URL` | Coordinator, worker | Required | Kafka bootstrap server used for metadata, ingress, and egress topics. |
| `S3_ENDPOINT` | Coordinator, worker | Required | Full URL for the S3-compatible object store. |
| `S3_ACCESS_KEY` | Coordinator, worker | Required | Access key for the S3-compatible object store. |
| `S3_SECRET_KEY` | Coordinator, worker | Required | Secret key for the S3-compatible object store. |
| `DISCOVERY_HOST` | Worker | Required | Coordinator hostname or IP used by workers. |
| `DISCOVERY_PORT` | Worker | Required | Coordinator control-plane port used by workers. |

### 🪣 Snapshot & State
## Common Execution Config

| Variable | Default Value | Description |
|------------------------------------|------------------------|----------------------------------------------|
| `SNAPSHOT_BUCKET_NAME` | `styx-snapshots` | S3/MinIO bucket for storing snapshots |
| `SNAPSHOT_FREQUENCY_SEC` | `10` (seconds) | How often to take a snapshot |
| `SNAPSHOT_COMPACTION_INTERVAL_SEC`| `10` (seconds) | Interval for compacting snapshots |
These variables affect shared Styx package behavior and should be kept consistent across the processes that participate in the same cluster.

---
| Variable | Default | Description |
|----------|---------|-------------|
| `ENABLE_COMPRESSION` | `true` | Compress MessagePack-serialized internal TCP messages larger than `COMPRESS_AFTER` with Zstandard. |
| `COMPRESS_AFTER` | `4096` | Serialized message size, in bytes, above which `ENABLE_COMPRESSION` can apply. |
| `USE_COMPOSITE_KEYS` | `true` | Honor operator `composite_key_hash_params` during partitioning. When disabled, Styx hashes the full key. |
| `SNAPSHOT_BUCKET_NAME` | `styx-snapshots` | Bucket used for worker, coordinator, and compactor snapshots. |
| `S3_REGION` | `us-east-1` | Region passed to the S3 client. |

### ⚙️ MinIO / Object Storage
## Coordinator Config

| Variable | Source | Description |
|-----------------------|----------------|------------------------------------|
| `S3_ENDPOINT` | Required | Full URL to connect to S3/MinIO |
| `S3_ACCESS_KEY` | Required | Access key for the S3/MinIO user |
| `S3_SECRET_KEY` | Required | Secret key for the S3/MinIO user |
| `S3_REGION` | `us-east-1` | S3 region for the client |
| `S3_INIT_RETRY_SEC` | `2` (seconds) | Sleep time between bucket init retries |
| `S3_INIT_MAX_RETRIES`| `30` | Max retry attempts before coordinator exits (0 = infinite) |
These variables configure the coordinator process.

---
### Kafka And Heartbeats

### 🛡️ Fault Tolerance & Restart
| Variable | Default | Description |
|----------|---------|-------------|
| `HEARTBEAT_LIMIT` | `5000` | Time in milliseconds before a worker is considered unhealthy. |
| `HEARTBEAT_CHECK_INTERVAL` | `1000` | Time in milliseconds between coordinator heartbeat checks. |
| `KAFKA_REPLICATION_FACTOR` | `3` | Replication factor used when the coordinator creates Styx Kafka topics. |

| Variable | Default Value | Description |
|----------------------------------|----------------|------------------------------------------------------------------------------------------------------|
| `MAX_WAIT_FOR_RESTARTS_SEC` | `0` (seconds) | How long to wait for the failed container(s) to restart before Styx initiates the automatic recovery |
### Snapshots And Object Storage

---
## Styx Worker
| Variable | Default | Description |
|----------|---------|-------------|
| `SNAPSHOT_FREQUENCY_SEC` | `30` | Time in seconds between coordinator snapshot attempts. |
| `COMPACT_SNAPSHOTS` | `false` | Whether the coordinator should trigger snapshot compaction after a new completed snapshot. |
| `S3_INIT_RETRY_SEC` | `2` | Time in seconds to wait between object-store initialization retries. |
| `S3_INIT_MAX_RETRIES` | `30` | Maximum object-store initialization attempts before exiting. Set to `0` to retry forever. |

These environment variables configure the **Styx Worker**, including discovery, parallelism, heartbeat, snapshotting, and conflict resolution.
### Recovery

---
| Variable | Default | Description |
|----------|---------|-------------|
| `MAX_WAIT_FOR_RESTARTS_SEC` | `0` | Time in seconds to wait for failed workers to restart before recovery begins. |

### 🧭 Discovery & Coordination
## Worker Config

| Variable | Required / Default | Description |
|----------------------|--------------------|--------------------------------------|
| `DISCOVERY_HOST` | Required | Hostname or IP of the Coordinator |
| `DISCOVERY_PORT` | Required | Port used to communicate with Coordinator |
These variables configure worker processes and the Aria execution protocol.

---
### Discovery, Ingress, And Heartbeats

### ⚙️ Kafka & Heartbeat
| Variable | Default | Description |
|----------|---------|-------------|
| `INGRESS_TYPE` | None | Ingress backend. Set to `KAFKA` for Kafka topic assignment. The local Docker Compose deployment already sets this for workers. |
| `HEARTBEAT_INTERVAL` | `500` | Time in milliseconds between worker heartbeats. |
| `WORKER_THREADS` | `1` | Number of Styx worker processes started inside a worker container. |

| Variable | Default Value | Description |
|--------------------|----------------|--------------------------------------------|
| `KAFKA_URL` | Required | Kafka broker address |
| `HEARTBEAT_INTERVAL` | `500` (ms) | Frequency at which the worker sends heartbeats |
### Epochs, Kafka, And Egress

---
| Variable | Default | Description |
|----------|---------|-------------|
| `SEQUENCE_MAX_SIZE` | `1000` | Maximum number of transactions sequenced in one Aria epoch. |
| `EPOCH_INTERVAL_MS` | `1` | Kafka egress polling interval in milliseconds while draining outputs during recovery. |

### 🧵 Parallelism & Threads
### Conflict Detection And Fallback

| Variable | Default Value | Description |
|---------------------|----------------|--------------------------------------------------------------|
| `WORKER_THREADS` (`N_THREADS`) | `1` | Number of Styx workers within the container |
| `SNAPSHOTTING_THREADS` | `4` | Threads dedicated to snapshotting |
| Variable | Default | Description |
|----------|---------|-------------|
| `CONFLICT_DETECTION_METHOD` | `0` | Aria conflict detection mode: `0` serializable, `1` deterministic reordering, `2` snapshot isolation. |
| `FALLBACK_STRATEGY_PERCENTAGE` | `-0.1` | Abort-rate threshold for running fallback. The default negative value enables fallback whenever an epoch has aborts. |

---
### Snapshotting And Migration

### 🪣 Snapshotting
| Variable | Default | Description |
|----------|---------|-------------|
| `SNAPSHOTTING_THREADS` | `4` | Number of threads used by the Aria snapshotting executor. |
| `MIGRATION_THREADS` | `4` | Number of processes used for worker-side migration/repartitioning work. |
| `USE_ASYNC_MIGRATION` | `true` | Whether workers send migrating state asynchronously while the protocol is running after migration restart. |
| `ASYNC_MIGRATION_BATCH_SIZE` | `2000` | Maximum number of state items included in each async migration batch. |

| Variable | Default Value | Description |
|------------------------|--------------------|---------------------------------------------|
| `SNAPSHOT_BUCKET_NAME` | `styx-snapshots` | Bucket where snapshots are stored |
| `SNAPSHOT_FREQUENCY` | `10` (seconds) | Snapshot frequency in epochs |
## Advanced Networking And Queues

---
These variables tune socket buffers, socket pooling, and worker queue backpressure. They are primarily useful when profiling high-throughput deployments.

### 📦 Object Storage (MinIO)
| Variable | Default | Description |
|----------|---------|-------------|
| `SOCKET_SND_BUF` | `4194304` | TCP send buffer size in bytes for Styx sockets. |
| `SOCKET_RCV_BUF` | `4194304` | TCP receive buffer size in bytes for Styx sockets. |
| `SOCKET_POOL_SIZE` | `16` | Number of pooled TCP connections per target `(host, port)` in the Styx networking manager. |
| `PROTOCOL_QUEUE_SIZE` | `10000` | Maximum number of queued protocol-plane messages per worker. |
| `CONTROL_QUEUE_SIZE` | `10000` | Maximum number of queued control-plane messages per worker. |
| `PROTOCOL_WORKERS` | `100` | Number of concurrent protocol queue workers handling protocol-plane messages. |

| Variable | Source | Description |
|---------------------|----------------|------------------------------------|
| `MINIO_URL` | `MINIO_HOST:MINIO_PORT` | Address of the MinIO server |
| `MINIO_ACCESS_KEY` | Required | MinIO access key |
| `MINIO_SECRET_KEY` | Required | MinIO secret key |
## Notes

---

### 📐 Conflict Detection & Strategy

| Variable | Default Value | Description |
|------------------------------|----------------|----------------------------------------------------------------------------|
| `CONFLICT_DETECTION_METHOD` | `0` | Styx's conflict detection strategy |
| `FALLBACK_STRATEGY_PERCENTAGE` | `-0.1` | % aborts before fallback logic triggers (negative enables it at all times) |

---

### ⏱️ Epoch & Sequence Control

| Variable | Default Value | Description |
|----------------------|--------------------|---------------------------------------------------|
| `EPOCH_INTERVAL_MS` | `1` (ms) | Kafka polling rate |
| `SEQUENCE_MAX_SIZE` | `1000` | Max size of a transactional epoch per Styx worker |

---
`Protocols.Aria` is currently hardcoded and is not configured through an
environment variable.
23 changes: 19 additions & 4 deletions docs/styx-docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ local runner so that you don't have to deploy a Styx cluster for debugging and s

Requirements:

- A `Python 3.13` environment
- A `Python 3.14` environment
- `Docker`
- `Docker Compose`

Expand All @@ -23,13 +23,28 @@ Install the styx-package:
pip install ./styx-package/
```

The local cluster script starts Kafka, RustFS, the coordinator, and workers with
Docker Compose. It also runs `docker system prune -f --volumes` before starting
the cluster, which removes unused Docker objects and volumes from your Docker
host.

Next start a Styx cluster by calling:


```shell
./scripts/start_styx_cluster.sh [scale_factor] [epoch_size]
./scripts/start_styx_cluster.sh [scale_factor] [epoch_size] [threads_per_worker] [enable_compression] [use_composite_keys]
```

`scale_factor` is how many Styx workers you want deployed and `epoch_size` the size of a transactional epoch in terms of number of transactions.
`scale_factor` is how many Styx workers you want deployed, `epoch_size` is the size of a transactional epoch in terms of number of transactions, and `threads_per_worker` controls how many threads run inside each worker container.

`enable_compression` controls whether Styx compresses large MessagePack-serialized internal TCP messages with Zstandard. Compression is enabled only for messages larger than the `COMPRESS_AFTER` threshold, which defaults to 4096 bytes. Leave this enabled unless you are explicitly comparing compression overhead.

`use_composite_keys` controls whether operators honor `composite_key_hash_params` during partitioning. When enabled, an operator can route a string key by one selected field, for example the first field in `warehouse:district:customer`; when disabled, Styx hashes the full key string. Leave this enabled for workloads such as TPC-C that define composite-key partitioning.

For example, to start four workers with epochs of 1000 transactions, one worker thread per container, compression enabled, and composite keys enabled:

```shell
./scripts/start_styx_cluster.sh 4 1000 1 true true
```

Now you are ready to submit your first stateful dataflow graph to the Styx cluster for processing!
Now you are ready to submit your first stateful dataflow graph to the Styx cluster for processing!
Loading
Loading