DataFlux is a high-performance, functional data processing engine built for modern Machine Learning pipelines. It provides a clean, fluent API for streaming and transforming data from any source while maintaining strict compatibility with PyTorch and Hugging Face.
Part of the Modular Quartet: LogFlow, Confluid, Liquify, and DataFlux.
- Functional Purity: Transforms are simple Python callables. No complex base classes required.
- Standardized Sample Triplet: Standardizes on
(input, target, metadata)for full traceability. - High Performance: Native multiprocess support via
.parallel(workers=N)using the safespawncontext. - Advanced Storage: Built-in support for high-performance backends:
- HDF5: Clean, efficient read/write.
- Zarr: Cloud-native, concurrent storage (Group and Batch modes).
- Directory: Robust concurrent writing for irregular data lengths.
- Passive Introspection: Automatically generates JSON manifests for visual orchestration in FluxStudio.
- 100% Reproducibility: Entire pipelines are serializable via Confluid manifests.
- Functional API: Provide a lazy, chainable pipeline API (
map,filter,batch). - Standardized Samples: Use the
Sample(input, target, metadata)triplet as the primary data unit. - Parallel Execution: Support high-performance multiprocess execution via
.parallel(workers=N)using thespawncontext.
- High-Performance Sinks: Native support for HDF5 (sequential), Zarr (concurrent), and Directory (irregular) storage.
- JointFlux Pattern: Support aggregating multiple heterogeneous data sources into a single stream, preserving per-source transform chains.
- Passive Introspection: Automatically discover available tools and ops for serialized manifests.
- Serialization Symmetry: Ensure full-pipeline states are serializable and reconstructible via Confluid.
import numpy as np
from dataflux.core import Flux
# 1. Define a simple transformation
def normalize(data: np.ndarray, mean: float = 0.0):
return data - mean
# 2. Build a pipeline
raw_data = [np.random.randn(10) for _ in range(100)]
flux = Flux(raw_data) \
.map(normalize, mean=0.5) \
.filter(lambda s: s.input.mean() > 0) \
.parallel(workers=4)
# 3. Collect or stream
for sample in flux:
print(sample.input.shape)DataFlux makes it easy to move data between different formats:
from dataflux.storage.hdf5 import HDF5Source
from dataflux.storage.zarr import ZarrGroupSink
# Stream from HDF5 to Zarr in parallel
Flux.from_source(HDF5Source("input.h5")) \
.parallel(workers=8) \
.map(heavy_op) \
.to_sink(ZarrGroupSink("output.zarr"))pip install git+https://github.com/Gearlux/dataflux.git@mainMIT