Skip to content

Gearlux/dataflux

Repository files navigation

DataFlux

DataFlux is a high-performance, functional data processing engine built for modern Machine Learning pipelines. It provides a clean, fluent API for streaming and transforming data from any source while maintaining strict compatibility with PyTorch and Hugging Face.

Part of the Modular Quartet: LogFlow, Confluid, Liquify, and DataFlux.

🚀 Key Features

  • Functional Purity: Transforms are simple Python callables. No complex base classes required.
  • Standardized Sample Triplet: Standardizes on (input, target, metadata) for full traceability.
  • High Performance: Native multiprocess support via .parallel(workers=N) using the safe spawn context.
  • Advanced Storage: Built-in support for high-performance backends:
    • HDF5: Clean, efficient read/write.
    • Zarr: Cloud-native, concurrent storage (Group and Batch modes).
    • Directory: Robust concurrent writing for irregular data lengths.
  • Passive Introspection: Automatically generates JSON manifests for visual orchestration in FluxStudio.
  • 100% Reproducibility: Entire pipelines are serializable via Confluid manifests.

🎯 Design Goals & Requirements

Stream Engine

  • Functional API: Provide a lazy, chainable pipeline API (map, filter, batch).
  • Standardized Samples: Use the Sample(input, target, metadata) triplet as the primary data unit.
  • Parallel Execution: Support high-performance multiprocess execution via .parallel(workers=N) using the spawn context.

Storage

  • High-Performance Sinks: Native support for HDF5 (sequential), Zarr (concurrent), and Directory (irregular) storage.
  • JointFlux Pattern: Support aggregating multiple heterogeneous data sources into a single stream, preserving per-source transform chains.

Metadata & Discovery

  • Passive Introspection: Automatically discover available tools and ops for serialized manifests.
  • Serialization Symmetry: Ensure full-pipeline states are serializable and reconstructible via Confluid.

🛠 Quick Start

import numpy as np
from dataflux.core import Flux

# 1. Define a simple transformation
def normalize(data: np.ndarray, mean: float = 0.0):
    return data - mean

# 2. Build a pipeline
raw_data = [np.random.randn(10) for _ in range(100)]

flux = Flux(raw_data) \
    .map(normalize, mean=0.5) \
    .filter(lambda s: s.input.mean() > 0) \
    .parallel(workers=4)

# 3. Collect or stream
for sample in flux:
    print(sample.input.shape)

📦 Storage Integration

DataFlux makes it easy to move data between different formats:

from dataflux.storage.hdf5 import HDF5Source
from dataflux.storage.zarr import ZarrGroupSink

# Stream from HDF5 to Zarr in parallel
Flux.from_source(HDF5Source("input.h5")) \
    .parallel(workers=8) \
    .map(heavy_op) \
    .to_sink(ZarrGroupSink("output.zarr"))

🔧 Installation

pip install git+https://github.com/Gearlux/dataflux.git@main

📄 License

MIT

About

High-performance, functional data processing engine built for modern Machine Learning pipelines. It provides a clean, fluent API for streaming and transforming data from any source while maintaining strict compatibility with PyTorch and Hugging Face.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages