diff --git a/README.md b/README.md index 099c88b..9c5b193 100644 --- a/README.md +++ b/README.md @@ -2,208 +2,222 @@ Processing event or streaming data presents several technological challenges. A variety of technologies are often used by scientific user facilities. ZMQ is used to stream data and messages in a peer-to-peer fashion. Message brokers like Kafka, Redis and RabbitMQ are often employed to route and pass messages from instruments to processing workflows. Arroyo provides an API and structure to flexibly integrate with these tools and incorporate arbitrarily complex processing workflows, letting the hooks to the workflow code be independent of the connection code and hence reusable at a variety of instruments. -The basic structure of building an arroyo implementation is to implement groups of several classes: -- +## Core Concepts + +The basic structure of building an arroyo implementation is to implement groups of several classes: + - `Operator` - receives `Messages` from a listener and can optionally send `Messages` to one or more `Publisher` instances - `Listener` - receives `Messages` from the external world, parse them into arroyo `Message` and sends them to an `Operator` - `Publisher` - receives `Messages` from a `Listener` and publishes them to the outside world +- `Block` - a container that holds one operator with any number of listeners and publishers +## Configuration-Based Deployment +Arroyo supports declarative configuration via YAML files, making it easy to deploy and configure pipelines without writing code: +```yaml +blocks: + - name: my_pipeline + description: Process messages from ZMQ -Arroyo is un-opinionated about deployment decsions. It is intended support listener-operator-publisher groups in: -- Single process -- Chain of processes where listening, processing and publishing can linked together through a protocol like ZMQ. One process's publisher can communicate with another process's listener, etc. + operator: + class: myapp.operators.MessageProcessor + kwargs: + timeout: 30 -This library is intended to provide classes, and will also include more specific common subclasses, like those that communicate over ZMQ or Redis. + listeners: + - class: arroyopy.zmq.ZMQListener + kwargs: + address: 'tcp://127.0.0.1:5555' + publishers: + - class: arroyopy.redis.RedisPublisher + kwargs: + channel: processed_data +``` +Run from the command line: +```bash +arroyo run config/pipeline.yaml +``` -```mermaid +See [docs/configuration.md](docs/configuration.md) for full details. ---- -title: Some sweet classes +## Quick Start -note: I guess we use "None" instead of "void" ---- +### Installation -classDiagram - namespace listener{ +```bash +pip install arroyopy +``` - class Listener{ - operator: Operator +With optional dependencies: +```bash +# Install with ZMQ support +pip install arroyopy[zmq] - *start(): None - *stop(): None - } +# Install with Redis support +pip install arroyopy[redis] +# Install everything for development +pip install arroyopy[dev] +``` - } +### Running a Pipeline - namespace operator{ - class Operator{ - publisher: List[Publisher] - *process(Message): None - add_publisher(Publisher): None - remove_publisher(Publisher): None +1. **Create a configuration file** (`pipeline.yaml`): - } - } +```yaml +blocks: + - name: simple_pipeline + description: Listen on ZMQ, process, publish to Redis - namespace publisher{ - class Publisher{ - *publish(Message): None - } + operator: + class: myapp.operators.SimpleProcessor - } + listeners: + - class: arroyopy.zmq.ZMQListener + kwargs: + address: 'tcp://127.0.0.1:5555' + socket_type: 'SUB' - namespace message{ + publishers: + - class: arroyopy.redis.RedisPublisher + kwargs: + host: localhost + channel: output +``` - class Message{ +2. **Run the pipeline**: - } +```bash +arroyo run pipeline.yaml +``` - class Start{ - data: Dict - } +3. **Validate your configuration** (optional): - class Stop{ - data: Dict - } +```bash +arroyo validate pipeline.yaml +``` - class Event{ - metadata: Dict - payload: bytes - } - } +### Multiple Blocks + +You can define multiple blocks in a single configuration file: + +```yaml +blocks: + - name: data_ingestion + operator: + class: myapp.operators.Ingestor + listeners: + - class: arroyopy.zmq.ZMQListener + kwargs: + address: 'tcp://127.0.0.1:5555' + publishers: + - class: arroyopy.redis.RedisPublisher + kwargs: + channel: raw_data + + - name: data_processing + operator: + class: myapp.operators.Processor + listeners: + - class: arroyopy.redis.RedisListener + kwargs: + channel: raw_data + publishers: + - class: arroyopy.zmq.ZMQPublisher + kwargs: + address: 'tcp://127.0.0.1:5556' +``` - namespace zmq{ - class ZMQListener{ - operator: Operator - socket: zmq.Socket - } +Run all blocks: +```bash +arroyo run config.yaml +``` - class ZMQPublisher{ - host: str - port: int - } +Or run a specific block: +```bash +arroyo run config.yaml --block data_ingestion +``` - } +## Deployment Options - namespace redis{ +Arroyo is un-opinionated about deployment decisions. It is intended to support listener-operator-publisher groups in: +- Single process +- Chain of processes where listening, processing and publishing can be linked together through a protocol like ZMQ. One process's publisher can communicate with another process's listener, etc. +- Configuration-based deployment via YAML files and CLI - class RedisListener{ - operator: Redis.client - pubsub: Redis.pubsub - } +This library is intended to provide base classes, and will also include more specific common subclasses, like those that communicate over ZMQ or Redis. - class RedisPublisher{ - pubsub: Redis.pubsub - } +## Devloper Installation - } +## Developer Installation +### Option 1: Pixi (Recommended) +[Pixi](https://pixi.sh) provides reproducible environments across all platforms with automatic dependency resolution. - Listener <|-- ZMQListener - ZMQListener <|-- ZMQPubSubListener - Listener o-- Operator +```bash +# Install Pixi +curl -fsSL https://pixi.sh/install.sh | bash - Publisher <|-- ZMQPublisher - ZMQPublisher <|-- ZMQPubSubPublisher +# Clone and navigate to the repository +git clone https://github.com/als-computing/arroyopy.git +cd arroyopy - Publisher <|-- RedisPublisher - Listener <|-- RedisListener - Operator o-- Publisher - Message <|-- Start - Message <|-- Stop - Message <|-- Event +# Install development environment +pixi install -e dev +# Run tests +pixi run -e dev test +# Run pre-commit checks +pixi run -e dev pre-commit ``` -## -In-process, listening for ZMQ - -Note that this leaves Concrete classes undefined as placeholders - -TODO: parent class labels -```mermaid +### Option 2: Conda/Mamba Environment -sequenceDiagram - autonumber - ExternalPublisher ->> ZMQPubSubListener: publish(bytes) - loop receiving thread - activate ZMQPubSubListener - ZMQPubSubListener ->> ConcreteMessageParser: parse(bytes) - ZMQPubSubListener ->> MessageQueue: put(bytes) - deactivate ZMQPubSubListener - - - ZMQPubSubListener ->> MessageQueue: message(Message) - end - activate ConcreteOperator - loop polling thread - ConcreteOperator ->> MessageQueue: get(bytes) - end - loop processing thread - ConcreteOperator ->> ConcreteOperator: calculate() - - ConcreteOperator ->> ConcretePublisher: publish() - end - deactivate ConcreteOperator +```bash +conda create -n arroyopy python=3.11 +conda activate arroyopy +pip install -e '.[dev]' ``` -# Devloper installation +### Option 3: Virtual Environment (venv) -## Conda environment -We use pixi to be forward thinking tio help with CI. We like it because it helps you easily test that dependencies for a variety of architects can resolve. - -However, at the time of writing we can't figure out how to get it to be a good developer experience. So, we create a conda environment like (note that at this time, we are using python 3.11 because of numpy and wheel availability): - -``` -conda create -n arroyo python=3.11 -conda activate arroyo +```bash +python -m venv .venv +source .venv/bin/activate pip install -e '.[dev]' ``` -## pre-commit -We use `pre-commit` in CI so you want to use it before commiting. -To test that your branches changes are all good, type: +## Pre-commit -``` +We use `pre-commit` for code quality checks. To test your changes: + +```bash pre-commit run --all-files ``` -Since our configuration of `pre-commit` uses `black`, it's possible that it will change files. If you like the changes, you can add them to your `git` commit with +If `pre-commit` (including `black` formatter) makes changes: -``` +```bash git add . +pre-commit run --all-files ``` -Then you can run `pre-commit run --all-files` again. - -## pixi -We use `pixi` for CI in github action. It's great for that but can't get our favorite developr tools to use the python environments that `pixi` creaetes in the `.pixi` folder. If you want to play with `pixi`, here are some tips: - -To setup a development environment: - -* Git clone this repo and CD into the directory -* Install [pixi](https://pixi.sh/v0.33.0/#installation) -* Install dependencies with -''' -pixi install -''' -* run pre-commit on the files -''' -pixi r pre-commit -''' +## Running Tests +With Pixi: +```bash +pixi run -e dev test +``` -* Run pytest with -''' -pixi r test -''' +With pip/conda: +```bash +pytest src/_test/ +``` # Copyright Arroyo Stream Processing Toolset (arroyopy) Copyright (c) 2025, The Regents of the University of California, through Lawrence Berkeley National Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy).