diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dc90d1c7..ae385843 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,7 @@ jobs: cd styx-package && python setup.py build_ext --inplace && cd .. python -m pip install styx-package/ python -m pip install -r coordinator/requirements.txt - python -m pip install pytest==9.0.2 pytest-cov pytest-asyncio + python -m pip install pytest==9.0.3 pytest-cov pytest-asyncio - name: Run coordinator unit tests run: pytest tests/unit/coordinator/ -v --cov=coordinator --cov-report=xml:coverage-coordinator.xml @@ -43,6 +43,7 @@ jobs: flags: coordinator token: ${{ secrets.CODECOV_TOKEN }} fail_ci_if_error: true + use_pypi: true unit-tests-worker: name: Unit tests (worker) @@ -63,7 +64,7 @@ jobs: python -m pip install --upgrade pip setuptools Cython cd styx-package && python setup.py build_ext --inplace && cd .. python -m pip install styx-package/ - python -m pip install msgspec pytest==9.0.2 pytest-cov pytest-asyncio psutil + python -m pip install msgspec pytest==9.0.3 pytest-cov pytest-asyncio psutil - name: Build Cython extensions run: python worker/setup.py build_ext --inplace @@ -78,6 +79,7 @@ jobs: flags: worker token: ${{ secrets.CODECOV_TOKEN }} fail_ci_if_error: true + use_pypi: true unit-tests-styx-package: name: Unit tests (styx-package) @@ -98,7 +100,7 @@ jobs: python -m pip install --upgrade pip setuptools Cython cd styx-package && python setup.py build_ext --inplace && cd .. python -m pip install -e styx-package/ - python -m pip install pytest==9.0.2 pytest-cov pytest-asyncio + python -m pip install pytest==9.0.3 pytest-cov pytest-asyncio - name: Run styx-package unit tests run: pytest tests/unit/styx_package/ -v --cov=styx-package/styx --cov-report=xml:coverage-styx-package.xml @@ -110,6 +112,7 @@ jobs: flags: styx-package token: ${{ secrets.CODECOV_TOKEN }} fail_ci_if_error: true + use_pypi: true integration-tests: name: Integration tests @@ -132,7 +135,7 @@ jobs: cd styx-package && python setup.py build_ext --inplace && cd .. python -m pip install styx-package/ python worker/setup.py build_ext --inplace - python -m pip install pytest==9.0.2 pytest-asyncio pytest-cov "testcontainers[kafka,minio]>=4.14.1" + python -m pip install pytest==9.0.3 pytest-asyncio pytest-cov "testcontainers[kafka,minio]>=4.14.2" - name: Run integration tests run: pytest tests/integration/ -v --cov=styx-package/styx --cov=coordinator --cov=worker --cov-report=xml:coverage-integration.xml @@ -144,3 +147,4 @@ jobs: flags: integration token: ${{ secrets.CODECOV_TOKEN }} fail_ci_if_error: true + use_pypi: true diff --git a/.gitignore b/.gitignore index c01ff99a..2ca01461 100755 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ *.py[cod] *$py.class +.DS_Store # C extensions *.so diff --git a/README.md b/README.md index 5933735f..6f64f789 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,11 @@ pip install -r requirements.txt * [`coordinator`](https://github.com/delftdata/styx/tree/main/coordinator) Styx coordinator. -* [`demo`](https://github.com/delftdata/styx/tree/main/benchmark) +* [`demo`](https://github.com/delftdata/styx/tree/main/demo) The YCSB-T, Deathstar, TPC-C and scalability benchmarks we used for the experiments. * [`grafana`](https://github.com/delftdata/styx/tree/main/grafana) - The confinguration files for the deployment of our visualization dashboards. + The configuration files for the deployment of our visualization dashboards. * [`styx-package`](https://github.com/delftdata/styx/tree/main/styx-package) The Styx framework Python package. @@ -50,7 +50,7 @@ pip install -r requirements.txt * [`tests`](https://github.com/delftdata/styx/tree/main/tests) Tests for the worker components of Styx. -* [`worker`](https://github.com/delftdata/styx/tree/main/styx-package) +* [`worker`](https://github.com/delftdata/styx/tree/main/worker) Styx worker. ## Container images @@ -162,9 +162,9 @@ To clear kafka: `docker compose -f docker-compose-kafka.yml down --volumes` ### S3 (Rustfs) -To run self-hosted S3: `docker-compose up -f docker-compose-s3.yml up` +To run self-hosted S3: `docker compose -f docker-compose-s3.yml up` -To clear self-hosted S3: `docker-compose -f docker-compose-s3.yml down --volumes` +To clear self-hosted S3: `docker compose -f docker-compose-s3.yml down --volumes` --- @@ -172,9 +172,13 @@ Then, you can start the Styx engine and specify the desired scale. ### Styx Engine -To run the SE: `docker-compose up --build --scale worker=4` +To run the SE: `docker compose up --build --scale worker=4` -To clear the SE: `docker-compose down --volumes` +The worker service in `docker-compose.yml` sets `INGRESS_TYPE=KAFKA`. If you +start workers manually outside Docker Compose, set `INGRESS_TYPE=KAFKA` together +with the Kafka, coordinator, and S3-compatible storage environment variables. + +To clear the SE: `docker compose down --volumes` ##### Cite Styx diff --git a/docs/styx-docs/docker-env-variables.md b/docs/styx-docs/docker-env-variables.md index 22c6793c..ac25f544 100644 --- a/docs/styx-docs/docker-env-variables.md +++ b/docs/styx-docs/docker-env-variables.md @@ -1,119 +1,113 @@ # Environment Variables -## Styx Coordinator +This page lists the environment variables read by Styx runtime components. Many values are read when Python modules are imported, so set them before starting the coordinator, workers, clients, or tests. -The following environment variables configure the behavior of the **Styx Coordinator**, including heartbeats, Kafka settings, snapshotting, and object storage. +Boolean values are parsed with Python's `strtobool`, so values such as `true`, `false`, `1`, `0`, `yes`, and `no` are accepted. ---- +Styx supports S3-compatible object storages for each snapshots. The local Docker Compose setup uses +RustFS, but any compatible endpoint can be used with the `S3_*` variables. -### ๐Ÿงญ Core Configuration +## Required Runtime Config -| Variable | Default Value | Description | -|---------------------------|-------------------------|---------------------------------------------| -| `KAFKA_URL` | `KAFKA_HOST:KAFKA_PORT` | Kafka bootstrap server for messaging | -| `HEARTBEAT_LIMIT` | `5000` (ms) | Max time before a worker is considered dead | -| `HEARTBEAT_CHECK_INTERVAL` | `500` (ms) | How often to check worker heartbeats | -| `MAX_OPERATOR_PARALLELISM` | `10` | Max number of operator partitions | -| `PROTOCOL` | `Protocols.Aria` | Transaction execution protocol used by Styx | +These variables are required for a usual Kafka-backed Styx deployment. ---- +| Variable | Component | Default | Description | +|----------|-----------|---------|-------------| +| `KAFKA_URL` | Coordinator, worker | Required | Kafka bootstrap server used for metadata, ingress, and egress topics. | +| `S3_ENDPOINT` | Coordinator, worker | Required | Full URL for the S3-compatible object store. | +| `S3_ACCESS_KEY` | Coordinator, worker | Required | Access key for the S3-compatible object store. | +| `S3_SECRET_KEY` | Coordinator, worker | Required | Secret key for the S3-compatible object store. | +| `DISCOVERY_HOST` | Worker | Required | Coordinator hostname or IP used by workers. | +| `DISCOVERY_PORT` | Worker | Required | Coordinator control-plane port used by workers. | -### ๐Ÿชฃ Snapshot & State +## Common Execution Config -| Variable | Default Value | Description | -|------------------------------------|------------------------|----------------------------------------------| -| `SNAPSHOT_BUCKET_NAME` | `styx-snapshots` | S3/MinIO bucket for storing snapshots | -| `SNAPSHOT_FREQUENCY_SEC` | `10` (seconds) | How often to take a snapshot | -| `SNAPSHOT_COMPACTION_INTERVAL_SEC`| `10` (seconds) | Interval for compacting snapshots | +These variables affect shared Styx package behavior and should be kept consistent across the processes that participate in the same cluster. ---- +| Variable | Default | Description | +|----------|---------|-------------| +| `ENABLE_COMPRESSION` | `true` | Compress MessagePack-serialized internal TCP messages larger than `COMPRESS_AFTER` with Zstandard. | +| `COMPRESS_AFTER` | `4096` | Serialized message size, in bytes, above which `ENABLE_COMPRESSION` can apply. | +| `USE_COMPOSITE_KEYS` | `true` | Honor operator `composite_key_hash_params` during partitioning. When disabled, Styx hashes the full key. | +| `SNAPSHOT_BUCKET_NAME` | `styx-snapshots` | Bucket used for worker, coordinator, and compactor snapshots. | +| `S3_REGION` | `us-east-1` | Region passed to the S3 client. | -### โš™๏ธ MinIO / Object Storage +## Coordinator Config -| Variable | Source | Description | -|-----------------------|----------------|------------------------------------| -| `S3_ENDPOINT` | Required | Full URL to connect to S3/MinIO | -| `S3_ACCESS_KEY` | Required | Access key for the S3/MinIO user | -| `S3_SECRET_KEY` | Required | Secret key for the S3/MinIO user | -| `S3_REGION` | `us-east-1` | S3 region for the client | -| `S3_INIT_RETRY_SEC` | `2` (seconds) | Sleep time between bucket init retries | -| `S3_INIT_MAX_RETRIES`| `30` | Max retry attempts before coordinator exits (0 = infinite) | +These variables configure the coordinator process. ---- +### Kafka And Heartbeats -### ๐Ÿ›ก๏ธ Fault Tolerance & Restart +| Variable | Default | Description | +|----------|---------|-------------| +| `HEARTBEAT_LIMIT` | `5000` | Time in milliseconds before a worker is considered unhealthy. | +| `HEARTBEAT_CHECK_INTERVAL` | `1000` | Time in milliseconds between coordinator heartbeat checks. | +| `KAFKA_REPLICATION_FACTOR` | `3` | Replication factor used when the coordinator creates Styx Kafka topics. | -| Variable | Default Value | Description | -|----------------------------------|----------------|------------------------------------------------------------------------------------------------------| -| `MAX_WAIT_FOR_RESTARTS_SEC` | `0` (seconds) | How long to wait for the failed container(s) to restart before Styx initiates the automatic recovery | +### Snapshots And Object Storage ---- -## Styx Worker +| Variable | Default | Description | +|----------|---------|-------------| +| `SNAPSHOT_FREQUENCY_SEC` | `30` | Time in seconds between coordinator snapshot attempts. | +| `COMPACT_SNAPSHOTS` | `false` | Whether the coordinator should trigger snapshot compaction after a new completed snapshot. | +| `S3_INIT_RETRY_SEC` | `2` | Time in seconds to wait between object-store initialization retries. | +| `S3_INIT_MAX_RETRIES` | `30` | Maximum object-store initialization attempts before exiting. Set to `0` to retry forever. | -These environment variables configure the **Styx Worker**, including discovery, parallelism, heartbeat, snapshotting, and conflict resolution. +### Recovery ---- +| Variable | Default | Description | +|----------|---------|-------------| +| `MAX_WAIT_FOR_RESTARTS_SEC` | `0` | Time in seconds to wait for failed workers to restart before recovery begins. | -### ๐Ÿงญ Discovery & Coordination +## Worker Config -| Variable | Required / Default | Description | -|----------------------|--------------------|--------------------------------------| -| `DISCOVERY_HOST` | Required | Hostname or IP of the Coordinator | -| `DISCOVERY_PORT` | Required | Port used to communicate with Coordinator | +These variables configure worker processes and the Aria execution protocol. ---- +### Discovery, Ingress, And Heartbeats -### โš™๏ธ Kafka & Heartbeat +| Variable | Default | Description | +|----------|---------|-------------| +| `INGRESS_TYPE` | None | Ingress backend. Set to `KAFKA` for Kafka topic assignment. The local Docker Compose deployment already sets this for workers. | +| `HEARTBEAT_INTERVAL` | `500` | Time in milliseconds between worker heartbeats. | +| `WORKER_THREADS` | `1` | Number of Styx worker processes started inside a worker container. | -| Variable | Default Value | Description | -|--------------------|----------------|--------------------------------------------| -| `KAFKA_URL` | Required | Kafka broker address | -| `HEARTBEAT_INTERVAL` | `500` (ms) | Frequency at which the worker sends heartbeats | +### Epochs, Kafka, And Egress ---- +| Variable | Default | Description | +|----------|---------|-------------| +| `SEQUENCE_MAX_SIZE` | `1000` | Maximum number of transactions sequenced in one Aria epoch. | +| `EPOCH_INTERVAL_MS` | `1` | Kafka egress polling interval in milliseconds while draining outputs during recovery. | -### ๐Ÿงต Parallelism & Threads +### Conflict Detection And Fallback -| Variable | Default Value | Description | -|---------------------|----------------|--------------------------------------------------------------| -| `WORKER_THREADS` (`N_THREADS`) | `1` | Number of Styx workers within the container | -| `SNAPSHOTTING_THREADS` | `4` | Threads dedicated to snapshotting | +| Variable | Default | Description | +|----------|---------|-------------| +| `CONFLICT_DETECTION_METHOD` | `0` | Aria conflict detection mode: `0` serializable, `1` deterministic reordering, `2` snapshot isolation. | +| `FALLBACK_STRATEGY_PERCENTAGE` | `-0.1` | Abort-rate threshold for running fallback. The default negative value enables fallback whenever an epoch has aborts. | ---- +### Snapshotting And Migration -### ๐Ÿชฃ Snapshotting +| Variable | Default | Description | +|----------|---------|-------------| +| `SNAPSHOTTING_THREADS` | `4` | Number of threads used by the Aria snapshotting executor. | +| `MIGRATION_THREADS` | `4` | Number of processes used for worker-side migration/repartitioning work. | +| `USE_ASYNC_MIGRATION` | `true` | Whether workers send migrating state asynchronously while the protocol is running after migration restart. | +| `ASYNC_MIGRATION_BATCH_SIZE` | `2000` | Maximum number of state items included in each async migration batch. | -| Variable | Default Value | Description | -|------------------------|--------------------|---------------------------------------------| -| `SNAPSHOT_BUCKET_NAME` | `styx-snapshots` | Bucket where snapshots are stored | -| `SNAPSHOT_FREQUENCY` | `10` (seconds) | Snapshot frequency in epochs | +## Advanced Networking And Queues ---- +These variables tune socket buffers, socket pooling, and worker queue backpressure. They are primarily useful when profiling high-throughput deployments. -### ๐Ÿ“ฆ Object Storage (MinIO) +| Variable | Default | Description | +|----------|---------|-------------| +| `SOCKET_SND_BUF` | `4194304` | TCP send buffer size in bytes for Styx sockets. | +| `SOCKET_RCV_BUF` | `4194304` | TCP receive buffer size in bytes for Styx sockets. | +| `SOCKET_POOL_SIZE` | `16` | Number of pooled TCP connections per target `(host, port)` in the Styx networking manager. | +| `PROTOCOL_QUEUE_SIZE` | `10000` | Maximum number of queued protocol-plane messages per worker. | +| `CONTROL_QUEUE_SIZE` | `10000` | Maximum number of queued control-plane messages per worker. | +| `PROTOCOL_WORKERS` | `100` | Number of concurrent protocol queue workers handling protocol-plane messages. | -| Variable | Source | Description | -|---------------------|----------------|------------------------------------| -| `MINIO_URL` | `MINIO_HOST:MINIO_PORT` | Address of the MinIO server | -| `MINIO_ACCESS_KEY` | Required | MinIO access key | -| `MINIO_SECRET_KEY` | Required | MinIO secret key | +## Notes ---- - -### ๐Ÿ“ Conflict Detection & Strategy - -| Variable | Default Value | Description | -|------------------------------|----------------|----------------------------------------------------------------------------| -| `CONFLICT_DETECTION_METHOD` | `0` | Styx's conflict detection strategy | -| `FALLBACK_STRATEGY_PERCENTAGE` | `-0.1` | % aborts before fallback logic triggers (negative enables it at all times) | - ---- - -### โฑ๏ธ Epoch & Sequence Control - -| Variable | Default Value | Description | -|----------------------|--------------------|---------------------------------------------------| -| `EPOCH_INTERVAL_MS` | `1` (ms) | Kafka polling rate | -| `SEQUENCE_MAX_SIZE` | `1000` | Max size of a transactional epoch per Styx worker | - ---- +`Protocols.Aria` is currently hardcoded and is not configured through an +environment variable. diff --git a/docs/styx-docs/quickstart.md b/docs/styx-docs/quickstart.md index 9b842ac6..3f0a663f 100644 --- a/docs/styx-docs/quickstart.md +++ b/docs/styx-docs/quickstart.md @@ -7,7 +7,7 @@ local runner so that you don't have to deploy a Styx cluster for debugging and s Requirements: - - A `Python 3.13` environment + - A `Python 3.14` environment - `Docker` - `Docker Compose` @@ -23,13 +23,28 @@ Install the styx-package: pip install ./styx-package/ ``` +The local cluster script starts Kafka, RustFS, the coordinator, and workers with +Docker Compose. It also runs `docker system prune -f --volumes` before starting +the cluster, which removes unused Docker objects and volumes from your Docker +host. + Next start a Styx cluster by calling: ```shell -./scripts/start_styx_cluster.sh [scale_factor] [epoch_size] +./scripts/start_styx_cluster.sh [scale_factor] [epoch_size] [threads_per_worker] [enable_compression] [use_composite_keys] ``` -`scale_factor` is how many Styx workers you want deployed and `epoch_size` the size of a transactional epoch in terms of number of transactions. +`scale_factor` is how many Styx workers you want deployed, `epoch_size` is the size of a transactional epoch in terms of number of transactions, and `threads_per_worker` controls how many threads run inside each worker container. + +`enable_compression` controls whether Styx compresses large MessagePack-serialized internal TCP messages with Zstandard. Compression is enabled only for messages larger than the `COMPRESS_AFTER` threshold, which defaults to 4096 bytes. Leave this enabled unless you are explicitly comparing compression overhead. + +`use_composite_keys` controls whether operators honor `composite_key_hash_params` during partitioning. When enabled, an operator can route a string key by one selected field, for example the first field in `warehouse:district:customer`; when disabled, Styx hashes the full key string. Leave this enabled for workloads such as TPC-C that define composite-key partitioning. + +For example, to start four workers with epochs of 1000 transactions, one worker thread per container, compression enabled, and composite keys enabled: + +```shell +./scripts/start_styx_cluster.sh 4 1000 1 true true +``` -Now you are ready to submit your first stateful dataflow graph to the Styx cluster for processing! \ No newline at end of file +Now you are ready to submit your first stateful dataflow graph to the Styx cluster for processing! diff --git a/docs/styx-docs/running-experiments.md b/docs/styx-docs/running-experiments.md index eb5bf2c5..7c272ec3 100644 --- a/docs/styx-docs/running-experiments.md +++ b/docs/styx-docs/running-experiments.md @@ -8,7 +8,7 @@ This guide covers how to run Styx benchmark experiments in all supported deploym | Mode | Description | |------|-------------| -| `docker-compose` | Local cluster using Docker Compose. Default for migration/scalability scripts. | +| `docker-compose` | Local cluster using Docker Compose. Default for experiment, migration, and scalability scripts. | | `k8s-minikube` | Kubernetes via minikube. Builds and loads images locally. | | `k8s-cluster` | Generic Kubernetes cluster (e.g. on-prem or cloud). Uses pre-published images. | @@ -32,6 +32,11 @@ The mode is controlled by the `DEPLOY_MODE` environment variable. |------|---------| | Docker + Docker Compose | Running the full Styx stack locally | +The docker-compose scripts call `scripts/start_styx_cluster.sh`, which runs +`docker system prune -f --volumes` before starting a fresh local cluster. This +removes unused Docker objects and volumes from your Docker host, not just Styx +containers. + ### k8s-minikube mode | Tool | Purpose | @@ -123,15 +128,15 @@ sudo kubefwd --version | 9 | `warmup_seconds` | Seconds excluded from metrics | | 10 | `epoch_size` | Max transactions per Aria epoch (e.g. `1000`) | | 11 _(optional)_ | `styx_threads_per_worker` | Worker threads per container (default: `1`) | -| 12 _(optional)_ | `enable_compression` | `true`/`false` โ€” ZSTD snapshot compression (default: `true`) | -| 13 _(optional)_ | `use_composite_keys` | `true`/`false` โ€” composite key hashing (default: `true`) | +| 12 _(optional)_ | `enable_compression` | `true`/`false` โ€” compress large MessagePack-serialized internal TCP messages with Zstandard (default: `true`) | +| 13 _(optional)_ | `use_composite_keys` | `true`/`false` โ€” honor operator `composite_key_hash_params` when partitioning keys (default: `true`) | | 14 _(optional)_ | `regenerate_tpcc_data` | `true`/`false` โ€” force TPC-C data regeneration (default: `false`) | ### Environment variables | Variable | Default | Description | |----------|---------|-------------| -| `DEPLOY_MODE` | `k8s-minikube` | Deployment mode: `docker-compose`, `k8s-minikube`, `k8s-cluster` | +| `DEPLOY_MODE` | `docker-compose` | Deployment mode: `docker-compose`, `k8s-minikube`, `k8s-cluster` | | `RELEASE_NAME` | `styx-cluster` | Helm release name (k8s modes) | | `NAMESPACE` | `styx` | Kubernetes namespace (k8s modes) | diff --git a/requirements.txt b/requirements.txt index 4e9d1320..aec59ab2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,5 +33,5 @@ mkdocs-material==9.7.6 pytest==9.0.3 pytest-asyncio==1.3.0 testcontainers[kafka,minio]>=4.14.2 -# make sure to have the same ruff verseion as in .github/workflows/ruff.yml line 30 -ruff==0.15.12 \ No newline at end of file +# make sure to have the same ruff version as in .github/workflows/ruff.yml line 30 +ruff==0.15.12