Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 155 additions & 141 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,208 +2,222 @@

Processing event or streaming data presents several technological challenges. A variety of technologies are often used by scientific user facilities. ZMQ is used to stream data and messages in a peer-to-peer fashion. Message brokers like Kafka, Redis and RabbitMQ are often employed to route and pass messages from instruments to processing workflows. Arroyo provides an API and structure to flexibly integrate with these tools and incorporate arbitrarily complex processing workflows, letting the hooks to the workflow code be independent of the connection code and hence reusable at a variety of instruments.

The basic structure of building an arroyo implementation is to implement groups of several classes:
-
## Core Concepts

The basic structure of building an arroyo implementation is to implement groups of several classes:

- `Operator` - receives `Messages` from a listener and can optionally send `Messages` to one or more `Publisher` instances
- `Listener` - receives `Messages` from the external world, parse them into arroyo `Message` and sends them to an `Operator`
- `Publisher` - receives `Messages` from a `Listener` and publishes them to the outside world
- `Block` - a container that holds one operator with any number of listeners and publishers

## Configuration-Based Deployment

Arroyo supports declarative configuration via YAML files, making it easy to deploy and configure pipelines without writing code:

```yaml
blocks:
- name: my_pipeline
description: Process messages from ZMQ

Arroyo is un-opinionated about deployment decsions. It is intended support listener-operator-publisher groups in:
- Single process
- Chain of processes where listening, processing and publishing can linked together through a protocol like ZMQ. One process's publisher can communicate with another process's listener, etc.
operator:
class: myapp.operators.MessageProcessor
kwargs:
timeout: 30

This library is intended to provide classes, and will also include more specific common subclasses, like those that communicate over ZMQ or Redis.
listeners:
- class: arroyopy.zmq.ZMQListener
kwargs:
address: 'tcp://127.0.0.1:5555'

publishers:
- class: arroyopy.redis.RedisPublisher
kwargs:
channel: processed_data
```

Run from the command line:
```bash
arroyo run config/pipeline.yaml
```

```mermaid
See [docs/configuration.md](docs/configuration.md) for full details.

---
title: Some sweet classes
## Quick Start

note: I guess we use "None" instead of "void"
---
### Installation

classDiagram
namespace listener{
```bash
pip install arroyopy
```

class Listener{
operator: Operator
With optional dependencies:
```bash
# Install with ZMQ support
pip install arroyopy[zmq]

*start(): None
*stop(): None
}
# Install with Redis support
pip install arroyopy[redis]

# Install everything for development
pip install arroyopy[dev]
```

}
### Running a Pipeline

namespace operator{
class Operator{
publisher: List[Publisher]
*process(Message): None
add_publisher(Publisher): None
remove_publisher(Publisher): None
1. **Create a configuration file** (`pipeline.yaml`):

}
}
```yaml
blocks:
- name: simple_pipeline
description: Listen on ZMQ, process, publish to Redis

namespace publisher{
class Publisher{
*publish(Message): None
}
operator:
class: myapp.operators.SimpleProcessor

}
listeners:
- class: arroyopy.zmq.ZMQListener
kwargs:
address: 'tcp://127.0.0.1:5555'
socket_type: 'SUB'

namespace message{
publishers:
- class: arroyopy.redis.RedisPublisher
kwargs:
host: localhost
channel: output
```

class Message{
2. **Run the pipeline**:

}
```bash
arroyo run pipeline.yaml
```

class Start{
data: Dict
}
3. **Validate your configuration** (optional):

class Stop{
data: Dict
}
```bash
arroyo validate pipeline.yaml
```

class Event{
metadata: Dict
payload: bytes
}
}
### Multiple Blocks

You can define multiple blocks in a single configuration file:

```yaml
blocks:
- name: data_ingestion
operator:
class: myapp.operators.Ingestor
listeners:
- class: arroyopy.zmq.ZMQListener
kwargs:
address: 'tcp://127.0.0.1:5555'
publishers:
- class: arroyopy.redis.RedisPublisher
kwargs:
channel: raw_data

- name: data_processing
operator:
class: myapp.operators.Processor
listeners:
- class: arroyopy.redis.RedisListener
kwargs:
channel: raw_data
publishers:
- class: arroyopy.zmq.ZMQPublisher
kwargs:
address: 'tcp://127.0.0.1:5556'
```

namespace zmq{
class ZMQListener{
operator: Operator
socket: zmq.Socket
}
Run all blocks:
```bash
arroyo run config.yaml
```

class ZMQPublisher{
host: str
port: int
}
Or run a specific block:
```bash
arroyo run config.yaml --block data_ingestion
```

}
## Deployment Options

namespace redis{
Arroyo is un-opinionated about deployment decisions. It is intended to support listener-operator-publisher groups in:
- Single process
- Chain of processes where listening, processing and publishing can be linked together through a protocol like ZMQ. One process's publisher can communicate with another process's listener, etc.
- Configuration-based deployment via YAML files and CLI

class RedisListener{
operator: Redis.client
pubsub: Redis.pubsub
}
This library is intended to provide base classes, and will also include more specific common subclasses, like those that communicate over ZMQ or Redis.

class RedisPublisher{
pubsub: Redis.pubsub
}
## Devloper Installation

}
## Developer Installation

### Option 1: Pixi (Recommended)

[Pixi](https://pixi.sh) provides reproducible environments across all platforms with automatic dependency resolution.

Listener <|-- ZMQListener
ZMQListener <|-- ZMQPubSubListener
Listener o-- Operator
```bash
# Install Pixi
curl -fsSL https://pixi.sh/install.sh | bash

Publisher <|-- ZMQPublisher
ZMQPublisher <|-- ZMQPubSubPublisher
# Clone and navigate to the repository
git clone https://github.com/als-computing/arroyopy.git
cd arroyopy

Publisher <|-- RedisPublisher
Listener <|-- RedisListener
Operator o-- Publisher
Message <|-- Start
Message <|-- Stop
Message <|-- Event
# Install development environment
pixi install -e dev

# Run tests
pixi run -e dev test

# Run pre-commit checks
pixi run -e dev pre-commit
```
##
In-process, listening for ZMQ

Note that this leaves Concrete classes undefined as placeholders

TODO: parent class labels

```mermaid
### Option 2: Conda/Mamba Environment

sequenceDiagram
autonumber
ExternalPublisher ->> ZMQPubSubListener: publish(bytes)
loop receiving thread
activate ZMQPubSubListener
ZMQPubSubListener ->> ConcreteMessageParser: parse(bytes)
ZMQPubSubListener ->> MessageQueue: put(bytes)
deactivate ZMQPubSubListener


ZMQPubSubListener ->> MessageQueue: message(Message)
end
activate ConcreteOperator
loop polling thread
ConcreteOperator ->> MessageQueue: get(bytes)
end
loop processing thread
ConcreteOperator ->> ConcreteOperator: calculate()

ConcreteOperator ->> ConcretePublisher: publish()
end
deactivate ConcreteOperator
```bash
conda create -n arroyopy python=3.11
conda activate arroyopy
pip install -e '.[dev]'
```

# Devloper installation
### Option 3: Virtual Environment (venv)

## Conda environment
We use pixi to be forward thinking tio help with CI. We like it because it helps you easily test that dependencies for a variety of architects can resolve.

However, at the time of writing we can't figure out how to get it to be a good developer experience. So, we create a conda environment like (note that at this time, we are using python 3.11 because of numpy and wheel availability):

```
conda create -n arroyo python=3.11
conda activate arroyo
```bash
python -m venv .venv
source .venv/bin/activate
pip install -e '.[dev]'
```

## pre-commit
We use `pre-commit` in CI so you want to use it before commiting.
To test that your branches changes are all good, type:
## Pre-commit

```
We use `pre-commit` for code quality checks. To test your changes:

```bash
pre-commit run --all-files
```

Since our configuration of `pre-commit` uses `black`, it's possible that it will change files. If you like the changes, you can add them to your `git` commit with
If `pre-commit` (including `black` formatter) makes changes:

```
```bash
git add .
pre-commit run --all-files
```

Then you can run `pre-commit run --all-files` again.

## pixi
We use `pixi` for CI in github action. It's great for that but can't get our favorite developr tools to use the python environments that `pixi` creaetes in the `.pixi` folder. If you want to play with `pixi`, here are some tips:

To setup a development environment:

* Git clone this repo and CD into the directory
* Install [pixi](https://pixi.sh/v0.33.0/#installation)
* Install dependencies with
'''
pixi install
'''
* run pre-commit on the files
'''
pixi r pre-commit
'''
## Running Tests

With Pixi:
```bash
pixi run -e dev test
```

* Run pytest with
'''
pixi r test
'''
With pip/conda:
```bash
pytest src/_test/
```

# Copyright
Arroyo Stream Processing Toolset (arroyopy) Copyright (c) 2025, The Regents of the University of California, through Lawrence Berkeley National Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy).
Expand Down