FinDrum is a lightweight Python framework for building and orchestrating data pipelines with extensible architecture via operators, datasources, schedulers, and triggers.
This repository (FinDrum-Platform) is the core package and is meant to be used as a library. Custom logic (pipelines and extensions) should be defined in external projects.
pip install findrum-platformFindrum pipelines are defined in YAML files and can include:
- A sequence of operators
- A datasource (provides data from external source)
- A scheduler (to run periodically)
- An event trigger (to respond to real-time events)
scheduler:
type: MyCustomScheduler
pipeline:
- id: batch_ingest
datasource: MyDataSource
params:
key: value
- id: transform
operator: MyOperator
depends_on: batch_ingest
params:
key: valueevent:
type: MyTrigger
config:
key: value
pipeline:
- id: step1
operator: MyOperator
depends_on: MyTrigger
params:
key: value
- id: step2
operator: DownstreamOperator
depends_on: step1Findrum provides a minimal interface for each pipeline component. These are abstract base classes that must be subclassed by your custom logic.
from findrum.interfaces import Operator
class MyOperator(Operator):
def run(self, input_data):
...Use when defining a step in a pipeline. Must implement run(input_data). We recommend that it returns a pandas.DataFrame.
from findrum.interfaces import DataSource
class MySource(DataSource):
def fetch(self, **kwargs):
...We recommend that it returns a pandas.DataFrame. It feeds the pipeline with data.
from findrum.interfaces import Scheduler
class MyScheduler(Scheduler):
def register(self, scheduler):
# e.g., add job to APScheduler instance
...Implements logic to execute the pipeline on a time interval or schedule.
from findrum.interfaces import EventTrigger
class MyTrigger(EventTrigger):
def start(self):
# Starts a file watcher, webhook listener, etc.
...Runs the pipeline when an external event occurs (e.g., new Kafka message, file in MinIO). The trigger should call self.emit(data) to push input into the pipeline.
You can import and use the main classes provided by Findrum:
from findrum import PlatformPlatform: Main entrypoint to manage pipelines, register them, and run based on schedule or events.
After installing findrum-platform, a CLI tool is available:
findrum-run pipelines/my_pipeline.yamlfindrum-run pipelines/my_pipeline.yaml --config config/config.yamlfindrum-run pipelines/my_pipeline.yaml --verboseFindrum requires a config.yaml file with registered class paths:
operators:
- my_project.operators.MyCustomOperator
datasources:
- my_project.datasources.MyDataSource
schedulers:
- my_project.schedulers.MyScheduler
triggers:
- my_project.triggers.MyTriggerThis lets Findrum dynamically import your components.
from findrum import Platform
platform = Platform("config.yaml")
platform.register_pipeline("pipelines/my_pipeline.yaml")
platform.start()You can also run your pipelines from a python file (like main.py for example) following the example above.
A typical project using Findrum should look like:
your-project/
├── operators/
│ └── my_operator.py
├── schedulers/
│ └── my_scheduler.py
├── triggers/
│ └── my_trigger.py
├── datasources/
│ └── my_datasource.py
├── pipelines/
│ └── my_pipeline.yaml
├── config.yaml
└── main.py (optional)
To get started quickly, FinDrum includes runnable examples in the examples/ folder.