Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ To explicitly enable or disable colorization, you may set the following environm
Datatrove supports a wide variety of input/output sources through [fsspec](https://filesystem-spec.readthedocs.io/en/latest/).

There are a few ways to provide a path to a datatrove block (for `input_folder`, `logging_dir`, `data_folder` and so on arguments):
- `str`: the simplest way is to pass a single string. Example: `/home/user/mydir`, `s3://mybucket/myinputdata`, `hf://datasets/allenai/c4/en/`
- `str`: the simplest way is to pass a single string. Example: `/home/user/mydir`, `s3://mybucket/myinputdata`, `hf://buckets/myorg/my-bucket/raw/`, `hf://datasets/allenai/c4/en/`

> Use `hf://buckets/...` for raw and intermediate data (S3-like, mutable, no versioning). Use `hf://datasets/...` for datasets ready to be published. See [HF Storage Buckets](#hf-storage-buckets) below.

- `(str, fsspec filesystem instance)`: a string path and a fully initialized filesystem object. Example: `("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))`
- `(str, dict)`: a string path and a dictionary with options to initialize a fs. Example (equivalent to the previous line): `("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})`
Expand Down Expand Up @@ -352,6 +354,8 @@ Set `rollouts_per_document` to automatically run the same rollout multiple times

For a ready-to-use script for synthetic data generation at scale (supporting models from 1B to 1T parameters, local/SLURM execution, and multi-node setups), see [`generate_data.py`](examples/inference/generate_data.py). This script handles prompt-based generation with configurable system prompts and templates.

> For raw generation output, write to `hf://buckets/<org>/<bucket>/...` (or use `HuggingFaceBucketWriter`) and only promote the cleaned, ready-to-share data to `hf://datasets/...`. See [HF Storage Buckets](#hf-storage-buckets).

#### Advanced configuration

`shared_context` lets you inject shared state into every rollout invocation. It accepts:
Expand Down Expand Up @@ -454,6 +458,75 @@ JsonlWriter(
)
```

#### Where to write data on the Hugging Face Hub

For Hub-backed output, prefer **buckets for raw / intermediate data** and
**datasets for the published, ready-to-share version**. Both have dedicated writers:

```python
from datatrove.pipeline.writers import HuggingFaceBucketWriter, HuggingFaceDatasetWriter

# Recommended for raw / intermediate output of large pipelines:
HuggingFaceBucketWriter(
bucket="myorg/my-bucket",
prefix="v1/raw", # path inside the bucket
private=True,
overwrite=True, # delete existing files at prefix first (default: False = append)
)

# Use this when promoting the final dataset to a published HF dataset:
HuggingFaceDatasetWriter(
dataset="myorg/my-dataset",
private=True,
)
```

See [HF Storage Buckets](#hf-storage-buckets) for the full picture (including
direct fsspec writes, `hf-mount`, and HF Jobs volume mounts).

### HF Storage Buckets

Hugging Face [storage buckets](https://huggingface.co/docs/hub/storage-buckets) are
S3-like, mutable object storage backed by Xet. They are the recommended destination
for raw and intermediate data; promote the final, ready-to-publish version to a
dataset (`hf://datasets/...`).

You can read from and write to buckets in four ways — pick the one that fits your
deployment:

| Approach | When to use |
| --- | --- |
| `HuggingFaceBucketWriter` | Large datasets, staged Xet uploads, auto-create the bucket. Supports `overwrite=True` to replace existing files. |
| Direct fsspec path (`hf://buckets/...`) on any reader/writer | Simple read/write through `HfFileSystem`; no extra setup. |
| [`hf-mount`](https://huggingface.co/docs/hub/storage-buckets-mount) (FUSE/NFS) | Best read performance with zero code changes; treat the bucket like a local dir. |
| [HF Jobs volume mounts](https://huggingface.co/docs/huggingface_hub/main/guides/jobs#volume-mounts) | Zero setup when running on HF infra; the bucket is mounted at the path you choose. |

Reading uses the existing `ParquetReader` / `JsonlReader` blocks — no dedicated
bucket reader is needed because buckets are raw object storage. See
[`examples/bucket_synthetic_data.py`](examples/bucket_synthetic_data.py) for a
side-by-side comparison of all four approaches.

```python
from datatrove.pipeline.readers import ParquetReader
from datatrove.pipeline.writers import HuggingFaceBucketWriter

# Reading: any reader with an hf://buckets/... path works.
reader = ParquetReader(data_folder="hf://buckets/myorg/my-bucket/raw/")

# Writing: HuggingFaceBucketWriter stages files locally, then pushes via Xet.
# Set overwrite=True to replace existing files at the prefix (default: append).
writer = HuggingFaceBucketWriter(
bucket="myorg/my-bucket",
prefix="v1/filtered",
private=True,
cleanup=True,
overwrite=True,
)
```

Bucket URLs also work as `logging_dir` on any executor, e.g.
`logging_dir="hf://buckets/myorg/my-bucket/logs/v1"`.

### Deduplicating data
For deduplication check the examples [minhash_deduplication.py](examples/minhash_deduplication.py), [sentence_deduplication.py](examples/sentence_deduplication.py) and [exact_substrings.py](examples/exact_substrings.py).

Expand Down
106 changes: 106 additions & 0 deletions examples/bucket_synthetic_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Four ways to use Hugging Face storage buckets with datatrove.

Buckets (``hf://buckets/<org>/<bucket>``) are S3-like, mutable object storage
backed by Xet. They are the recommended destination for raw and intermediate
data in synthetic-data pipelines; promote to a Git-based dataset
(``hf://datasets/<org>/<dataset>``) once the data is ready to be published.

Run this file directly to see each approach printed; uncomment the
``executor.run()`` call you want to actually execute (you need an HF token with
bucket-write access).
"""

from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.filters import LambdaFilter
from datatrove.pipeline.readers import ParquetReader
from datatrove.pipeline.writers import HuggingFaceBucketWriter, ParquetWriter


ORG = "my-org"
BUCKET = "synth-data"
PREFIX = "v1"


# Approach A -- HuggingFaceBucketWriter (large datasets, staged Xet upload).
# Files are written locally first, then pushed to the bucket on rotation /
# close via ``batch_bucket_files``. Auto-creates the bucket on first upload.
def make_executor_with_bucket_writer() -> LocalPipelineExecutor:
return LocalPipelineExecutor(
pipeline=[
ParquetReader(data_folder=f"hf://buckets/{ORG}/{BUCKET}/raw/"),
LambdaFilter(lambda doc: len(doc.text) > 100),
HuggingFaceBucketWriter(
bucket=f"{ORG}/{BUCKET}",
prefix=f"{PREFIX}/filtered",
private=True,
cleanup=True,
overwrite=True, # replace existing files at prefix (default: False = append)
),
],
tasks=8,
workers=4,
logging_dir=f"hf://buckets/{ORG}/{BUCKET}/logs/{PREFIX}",
)


# Approach B -- Direct fsspec paths (simple, small/medium datasets).
# ParquetWriter just opens an ``hf://buckets/...`` URL via HfFileSystem. Good
# for ad-hoc use; use Approach A when you need staged uploads or auto-create.
def make_executor_with_fsspec_path() -> LocalPipelineExecutor:
return LocalPipelineExecutor(
pipeline=[
ParquetReader(data_folder=f"hf://buckets/{ORG}/{BUCKET}/raw/"),
LambdaFilter(lambda doc: len(doc.text) > 100),
ParquetWriter(output_folder=f"hf://buckets/{ORG}/{BUCKET}/{PREFIX}/filtered/"),
],
tasks=8,
workers=4,
logging_dir=f"hf://buckets/{ORG}/{BUCKET}/logs/{PREFIX}",
)


# Approach C -- hf-mount (zero code changes, best read performance).
# Mount the bucket once with the external Rust tool:
# $ hf-mount start bucket my-org/synth-data /mnt/synth-data
# Then point datatrove at plain local paths.
def make_executor_with_hf_mount() -> LocalPipelineExecutor:
return LocalPipelineExecutor(
pipeline=[
ParquetReader(data_folder="/mnt/synth-data/raw/"),
LambdaFilter(lambda doc: len(doc.text) > 100),
ParquetWriter(output_folder=f"/mnt/synth-data/{PREFIX}/filtered/"),
],
tasks=8,
workers=4,
logging_dir=f"/mnt/synth-data/logs/{PREFIX}",
)


# Approach D -- HF Jobs volume mounts (zero setup on HF infra).
# Submit the job with a ``-v`` mount; the bucket is exposed at ``/data``:
# $ hf jobs run -v hf://buckets/my-org/synth-data:/data \
# python:3.12 python pipeline.py
def make_executor_with_hf_jobs_mount() -> LocalPipelineExecutor:
return LocalPipelineExecutor(
pipeline=[
ParquetReader(data_folder="/data/raw/"),
LambdaFilter(lambda doc: len(doc.text) > 100),
ParquetWriter(output_folder=f"/data/{PREFIX}/filtered/"),
],
tasks=8,
workers=4,
logging_dir=f"/data/logs/{PREFIX}",
)


if __name__ == "__main__":
print("Approach A -- HuggingFaceBucketWriter (staged Xet upload):")
print(make_executor_with_bucket_writer())
print("\nApproach B -- ParquetWriter on hf://buckets/... (direct fsspec):")
print(make_executor_with_fsspec_path())
print("\nApproach C -- hf-mount + plain local paths:")
print(make_executor_with_hf_mount())
print("\nApproach D -- HF Jobs volume mount:")
print(make_executor_with_hf_jobs_mount())
# Uncomment to actually run one of them (requires HF token):
# make_executor_with_bucket_writer().run()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ requires-python = ">=3.10.0"
dependencies = [
"dill>=0.3.0",
"fsspec>=2023.12.2",
"huggingface-hub>=1.0.0", # use the v1 client line by default
"huggingface-hub>=1.5.0", # use the v1 client line by default; >=1.5.0 required for HF storage buckets
"humanize",
"loguru>=0.7.0",
"multiprocess",
Expand Down
2 changes: 1 addition & 1 deletion src/datatrove/pipeline/writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
from .parquet import ParquetWriter


from .huggingface import HuggingFaceDatasetWriter # isort:skip
from .huggingface import HuggingFaceBucketWriter, HuggingFaceDatasetWriter # isort:skip
33 changes: 23 additions & 10 deletions src/datatrove/pipeline/writers/disk_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,31 @@ def close_file(self, filename):
if not self.is_retryable_hf_hub_error(close_error) or not has_hf_upload_state:
raise
# fsspec marks the file closed, but the temp file still exists on disk.
# Retry the HF upload_file call directly with exponential backoff.
# Retry the HF upload directly with exponential backoff. Buckets and
# repositories use different upload APIs so we dispatch on the resolved
# path type.
from huggingface_hub.hf_file_system import HfFileSystemResolvedBucketPath

api = file_obj.fs._api

def _upload() -> None:
api.upload_file(
path_or_fileobj=file_obj.temp_file.name,
path_in_repo=file_obj.resolved_path.path_in_repo,
repo_id=file_obj.resolved_path.repo_id,
token=file_obj.fs.token,
repo_type=file_obj.resolved_path.repo_type,
revision=file_obj.resolved_path.revision,
)
if isinstance(file_obj.resolved_path, HfFileSystemResolvedBucketPath):

def _upload() -> None:
api.batch_bucket_files(
file_obj.resolved_path.bucket_id,
add=[(file_obj.temp_file.name, file_obj.resolved_path.path)],
)
else:

def _upload() -> None:
api.upload_file(
path_or_fileobj=file_obj.temp_file.name,
path_in_repo=file_obj.resolved_path.path_in_repo,
repo_id=file_obj.resolved_path.repo_id,
token=file_obj.fs.token,
repo_type=file_obj.resolved_path.repo_type,
revision=file_obj.resolved_path.revision,
)

self._retry_hf_hub_operation(
operation_name="upload",
Expand Down
127 changes: 127 additions & 0 deletions src/datatrove/pipeline/writers/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

from huggingface_hub import (
CommitOperationAdd,
batch_bucket_files,
create_bucket,
create_commit,
create_repo,
list_bucket_tree,
preupload_lfs_files,
)
from huggingface_hub.utils import HfHubHTTPError
Expand Down Expand Up @@ -136,3 +139,127 @@ def _on_file_switch(self, original_name, old_filename, new_filename):
"""
super()._on_file_switch(original_name, old_filename, new_filename)
self.upload_files(old_filename)


class HuggingFaceBucketWriter(ParquetWriter):
"""High-level writer for Hugging Face storage buckets.

Buckets are mutable, S3-like object storage backed by Xet. Unlike the
Git-based ``HuggingFaceDatasetWriter``, there are no commits or revisions:
files are uploaded directly via ``batch_bucket_files``. This writer stages
output locally and then pushes completed files to the bucket on rotation
(``max_file_size``) and on ``close()``.

Use this writer for very large raw / intermediate datasets. For published
datasets, use ``HuggingFaceDatasetWriter`` instead.
"""

default_output_filename: str = "data/${rank}.parquet"
name = "🪣 HuggingFace Bucket"

def __init__(
self,
bucket: str,
prefix: str = "",
private: bool = True,
overwrite: bool = False,
local_working_dir: DataFolderLike | None = None,
output_filename: str = None,
compression: Literal["snappy", "gzip", "brotli", "lz4", "zstd"] | None = "snappy",
adapter: Callable = None,
cleanup: bool = True,
expand_metadata: bool = True,
max_file_size: int = round(4.5 * 2**30), # 4.5GB, leave some room for the last batch
schema: Any = None,
save_media_bytes: bool = False,
):
"""
Args:
bucket: bucket id, e.g. ``"myorg/my-bucket"`` or ``"my-bucket"``.
prefix: optional path prefix inside the bucket (no leading or trailing ``/``).
private: whether to create the bucket as private if it does not exist yet.
overwrite: if ``True``, delete all existing files under ``prefix`` before the
first upload. Only deletes once per writer instance (safe for multi-file
pipelines). Default is ``False`` (append).
local_working_dir: where files are staged before upload. Must be local.
A ``TemporaryDirectory`` is used when not provided.
output_filename: filename template, may contain ``${rank}`` and metadata tags.
compression: parquet compression codec.
adapter: optional adapter function ``(self, document) -> dict``.
cleanup: delete each local staging file after successful upload.
expand_metadata: store each metadata key in its own column.
max_file_size: rotate to a new file when this size (bytes) is exceeded; -1 disables.
schema: optional pyarrow schema.
save_media_bytes: include raw media bytes in the parquet output.
"""
self.bucket = bucket
self.prefix = prefix.strip("/")
self.private = private
self.overwrite = overwrite
# Hold the TemporaryDirectory so it survives until ``self`` is garbage-collected.
self._local_working_tmpdir = tempfile.TemporaryDirectory() if local_working_dir is None else None
self.local_working_dir = get_datafolder(
local_working_dir if local_working_dir else self._local_working_tmpdir.name
)
self.cleanup = cleanup
if not self.local_working_dir.is_local():
raise ValueError("local_working_dir must be a local path")
if os.environ.get("HF_HUB_ENABLE_HF_TRANSFER", "0") == "1":
logger.warning(
"You should now use xet for uploads.\nSee https://hf.co/docs/huggingface_hub/en/guides/download#faster-downloads\nexport HF_HUB_ENABLE_HF_TRANSFER=0"
)
super().__init__(
output_folder=self.local_working_dir,
output_filename=output_filename,
compression=compression,
adapter=adapter,
expand_metadata=expand_metadata,
max_file_size=max_file_size,
schema=schema,
save_media_bytes=save_media_bytes,
)
self._bucket_init = False
self._overwrite_done = False

def _remote_path(self, filename: str) -> str:
"""Compute the in-bucket path for a staging filename."""
return f"{self.prefix}/{filename}" if self.prefix else filename

def _delete_existing_files(self) -> None:
"""Delete all existing files under ``self.prefix`` in the bucket (overwrite mode)."""
existing = [
entry.path
for entry in list_bucket_tree(self.bucket, prefix=self.prefix or None, recursive=True)
if getattr(entry, "type", None) == "file"
]
if existing:
logger.info(f"Overwrite mode: deleting {len(existing)} existing files at {self.prefix!r}")
batch_bucket_files(self.bucket, delete=existing)

def upload_files(self, *filenames: str) -> None:
"""Upload one or more staged files to the bucket via Xet, then optionally clean them up."""
if not self._bucket_init:
create_bucket(self.bucket, private=self.private, exist_ok=True)
self._bucket_init = True
if self.overwrite and not self._overwrite_done:
self._delete_existing_files()
self._overwrite_done = True
add = [(self.local_working_dir.resolve_paths(filename), self._remote_path(filename)) for filename in filenames]
logger.info(f"Uploading {','.join(filenames)} to bucket {self.bucket}...")
batch_bucket_files(self.bucket, add=add)
logger.info(f"Upload of {','.join(filenames)} to bucket {self.bucket} complete!")
if self.cleanup:
for filename in filenames:
self.local_working_dir.rm(filename)

def close(self, rank: int = 0) -> None:
filelist = list(self.output_mg.get_open_files().keys())
super().close()
if filelist:
logger.info(f"Starting upload of {len(filelist)} files to bucket {self.bucket}")
self.upload_files(*filelist)

def _on_file_switch(self, original_name, old_filename, new_filename):
"""When ``max_file_size`` triggers a rotation, upload the completed file immediately."""
super()._on_file_switch(original_name, old_filename, new_filename)
self.upload_files(old_filename)
Loading