diff --git a/README.md b/README.md index 9b5f15a1e..623d2899a 100644 --- a/README.md +++ b/README.md @@ -306,7 +306,9 @@ To explicitly enable or disable colorization, you may set the following environm Datatrove supports a wide variety of input/output sources through [fsspec](https://filesystem-spec.readthedocs.io/en/latest/). There are a few ways to provide a path to a datatrove block (for `input_folder`, `logging_dir`, `data_folder` and so on arguments): -- `str`: the simplest way is to pass a single string. Example: `/home/user/mydir`, `s3://mybucket/myinputdata`, `hf://datasets/allenai/c4/en/` +- `str`: the simplest way is to pass a single string. Example: `/home/user/mydir`, `s3://mybucket/myinputdata`, `hf://buckets/myorg/my-bucket/raw/`, `hf://datasets/allenai/c4/en/` + +> Use `hf://buckets/...` for raw and intermediate data (S3-like, mutable, no versioning). Use `hf://datasets/...` for datasets ready to be published. See [HF Storage Buckets](#hf-storage-buckets) below. - `(str, fsspec filesystem instance)`: a string path and a fully initialized filesystem object. Example: `("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))` - `(str, dict)`: a string path and a dictionary with options to initialize a fs. Example (equivalent to the previous line): `("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})` @@ -352,6 +354,8 @@ Set `rollouts_per_document` to automatically run the same rollout multiple times For a ready-to-use script for synthetic data generation at scale (supporting models from 1B to 1T parameters, local/SLURM execution, and multi-node setups), see [`generate_data.py`](examples/inference/generate_data.py). This script handles prompt-based generation with configurable system prompts and templates. +> For raw generation output, write to `hf://buckets///...` (or use `HuggingFaceBucketWriter`) and only promote the cleaned, ready-to-share data to `hf://datasets/...`. See [HF Storage Buckets](#hf-storage-buckets). + #### Advanced configuration `shared_context` lets you inject shared state into every rollout invocation. It accepts: @@ -454,6 +458,75 @@ JsonlWriter( ) ``` +#### Where to write data on the Hugging Face Hub + +For Hub-backed output, prefer **buckets for raw / intermediate data** and +**datasets for the published, ready-to-share version**. Both have dedicated writers: + +```python +from datatrove.pipeline.writers import HuggingFaceBucketWriter, HuggingFaceDatasetWriter + +# Recommended for raw / intermediate output of large pipelines: +HuggingFaceBucketWriter( + bucket="myorg/my-bucket", + prefix="v1/raw", # path inside the bucket + private=True, + overwrite=True, # delete existing files at prefix first (default: False = append) +) + +# Use this when promoting the final dataset to a published HF dataset: +HuggingFaceDatasetWriter( + dataset="myorg/my-dataset", + private=True, +) +``` + +See [HF Storage Buckets](#hf-storage-buckets) for the full picture (including +direct fsspec writes, `hf-mount`, and HF Jobs volume mounts). + +### HF Storage Buckets + +Hugging Face [storage buckets](https://huggingface.co/docs/hub/storage-buckets) are +S3-like, mutable object storage backed by Xet. They are the recommended destination +for raw and intermediate data; promote the final, ready-to-publish version to a +dataset (`hf://datasets/...`). + +You can read from and write to buckets in four ways — pick the one that fits your +deployment: + +| Approach | When to use | +| --- | --- | +| `HuggingFaceBucketWriter` | Large datasets, staged Xet uploads, auto-create the bucket. Supports `overwrite=True` to replace existing files. | +| Direct fsspec path (`hf://buckets/...`) on any reader/writer | Simple read/write through `HfFileSystem`; no extra setup. | +| [`hf-mount`](https://huggingface.co/docs/hub/storage-buckets-mount) (FUSE/NFS) | Best read performance with zero code changes; treat the bucket like a local dir. | +| [HF Jobs volume mounts](https://huggingface.co/docs/huggingface_hub/main/guides/jobs#volume-mounts) | Zero setup when running on HF infra; the bucket is mounted at the path you choose. | + +Reading uses the existing `ParquetReader` / `JsonlReader` blocks — no dedicated +bucket reader is needed because buckets are raw object storage. See +[`examples/bucket_synthetic_data.py`](examples/bucket_synthetic_data.py) for a +side-by-side comparison of all four approaches. + +```python +from datatrove.pipeline.readers import ParquetReader +from datatrove.pipeline.writers import HuggingFaceBucketWriter + +# Reading: any reader with an hf://buckets/... path works. +reader = ParquetReader(data_folder="hf://buckets/myorg/my-bucket/raw/") + +# Writing: HuggingFaceBucketWriter stages files locally, then pushes via Xet. +# Set overwrite=True to replace existing files at the prefix (default: append). +writer = HuggingFaceBucketWriter( + bucket="myorg/my-bucket", + prefix="v1/filtered", + private=True, + cleanup=True, + overwrite=True, +) +``` + +Bucket URLs also work as `logging_dir` on any executor, e.g. +`logging_dir="hf://buckets/myorg/my-bucket/logs/v1"`. + ### Deduplicating data For deduplication check the examples [minhash_deduplication.py](examples/minhash_deduplication.py), [sentence_deduplication.py](examples/sentence_deduplication.py) and [exact_substrings.py](examples/exact_substrings.py). diff --git a/examples/bucket_synthetic_data.py b/examples/bucket_synthetic_data.py new file mode 100644 index 000000000..b2507077d --- /dev/null +++ b/examples/bucket_synthetic_data.py @@ -0,0 +1,106 @@ +"""Four ways to use Hugging Face storage buckets with datatrove. + +Buckets (``hf://buckets//``) are S3-like, mutable object storage +backed by Xet. They are the recommended destination for raw and intermediate +data in synthetic-data pipelines; promote to a Git-based dataset +(``hf://datasets//``) once the data is ready to be published. + +Run this file directly to see each approach printed; uncomment the +``executor.run()`` call you want to actually execute (you need an HF token with +bucket-write access). +""" + +from datatrove.executor import LocalPipelineExecutor +from datatrove.pipeline.filters import LambdaFilter +from datatrove.pipeline.readers import ParquetReader +from datatrove.pipeline.writers import HuggingFaceBucketWriter, ParquetWriter + + +ORG = "my-org" +BUCKET = "synth-data" +PREFIX = "v1" + + +# Approach A -- HuggingFaceBucketWriter (large datasets, staged Xet upload). +# Files are written locally first, then pushed to the bucket on rotation / +# close via ``batch_bucket_files``. Auto-creates the bucket on first upload. +def make_executor_with_bucket_writer() -> LocalPipelineExecutor: + return LocalPipelineExecutor( + pipeline=[ + ParquetReader(data_folder=f"hf://buckets/{ORG}/{BUCKET}/raw/"), + LambdaFilter(lambda doc: len(doc.text) > 100), + HuggingFaceBucketWriter( + bucket=f"{ORG}/{BUCKET}", + prefix=f"{PREFIX}/filtered", + private=True, + cleanup=True, + overwrite=True, # replace existing files at prefix (default: False = append) + ), + ], + tasks=8, + workers=4, + logging_dir=f"hf://buckets/{ORG}/{BUCKET}/logs/{PREFIX}", + ) + + +# Approach B -- Direct fsspec paths (simple, small/medium datasets). +# ParquetWriter just opens an ``hf://buckets/...`` URL via HfFileSystem. Good +# for ad-hoc use; use Approach A when you need staged uploads or auto-create. +def make_executor_with_fsspec_path() -> LocalPipelineExecutor: + return LocalPipelineExecutor( + pipeline=[ + ParquetReader(data_folder=f"hf://buckets/{ORG}/{BUCKET}/raw/"), + LambdaFilter(lambda doc: len(doc.text) > 100), + ParquetWriter(output_folder=f"hf://buckets/{ORG}/{BUCKET}/{PREFIX}/filtered/"), + ], + tasks=8, + workers=4, + logging_dir=f"hf://buckets/{ORG}/{BUCKET}/logs/{PREFIX}", + ) + + +# Approach C -- hf-mount (zero code changes, best read performance). +# Mount the bucket once with the external Rust tool: +# $ hf-mount start bucket my-org/synth-data /mnt/synth-data +# Then point datatrove at plain local paths. +def make_executor_with_hf_mount() -> LocalPipelineExecutor: + return LocalPipelineExecutor( + pipeline=[ + ParquetReader(data_folder="/mnt/synth-data/raw/"), + LambdaFilter(lambda doc: len(doc.text) > 100), + ParquetWriter(output_folder=f"/mnt/synth-data/{PREFIX}/filtered/"), + ], + tasks=8, + workers=4, + logging_dir=f"/mnt/synth-data/logs/{PREFIX}", + ) + + +# Approach D -- HF Jobs volume mounts (zero setup on HF infra). +# Submit the job with a ``-v`` mount; the bucket is exposed at ``/data``: +# $ hf jobs run -v hf://buckets/my-org/synth-data:/data \ +# python:3.12 python pipeline.py +def make_executor_with_hf_jobs_mount() -> LocalPipelineExecutor: + return LocalPipelineExecutor( + pipeline=[ + ParquetReader(data_folder="/data/raw/"), + LambdaFilter(lambda doc: len(doc.text) > 100), + ParquetWriter(output_folder=f"/data/{PREFIX}/filtered/"), + ], + tasks=8, + workers=4, + logging_dir=f"/data/logs/{PREFIX}", + ) + + +if __name__ == "__main__": + print("Approach A -- HuggingFaceBucketWriter (staged Xet upload):") + print(make_executor_with_bucket_writer()) + print("\nApproach B -- ParquetWriter on hf://buckets/... (direct fsspec):") + print(make_executor_with_fsspec_path()) + print("\nApproach C -- hf-mount + plain local paths:") + print(make_executor_with_hf_mount()) + print("\nApproach D -- HF Jobs volume mount:") + print(make_executor_with_hf_jobs_mount()) + # Uncomment to actually run one of them (requires HF token): + # make_executor_with_bucket_writer().run() diff --git a/pyproject.toml b/pyproject.toml index 18ce76c8a..a20943e14 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ requires-python = ">=3.10.0" dependencies = [ "dill>=0.3.0", "fsspec>=2023.12.2", - "huggingface-hub>=1.0.0", # use the v1 client line by default + "huggingface-hub>=1.5.0", # use the v1 client line by default; >=1.5.0 required for HF storage buckets "humanize", "loguru>=0.7.0", "multiprocess", diff --git a/src/datatrove/pipeline/writers/__init__.py b/src/datatrove/pipeline/writers/__init__.py index 512f56c78..366386f89 100644 --- a/src/datatrove/pipeline/writers/__init__.py +++ b/src/datatrove/pipeline/writers/__init__.py @@ -2,4 +2,4 @@ from .parquet import ParquetWriter -from .huggingface import HuggingFaceDatasetWriter # isort:skip +from .huggingface import HuggingFaceBucketWriter, HuggingFaceDatasetWriter # isort:skip diff --git a/src/datatrove/pipeline/writers/disk_base.py b/src/datatrove/pipeline/writers/disk_base.py index 039e0a02a..96188205d 100644 --- a/src/datatrove/pipeline/writers/disk_base.py +++ b/src/datatrove/pipeline/writers/disk_base.py @@ -231,18 +231,31 @@ def close_file(self, filename): if not self.is_retryable_hf_hub_error(close_error) or not has_hf_upload_state: raise # fsspec marks the file closed, but the temp file still exists on disk. - # Retry the HF upload_file call directly with exponential backoff. + # Retry the HF upload directly with exponential backoff. Buckets and + # repositories use different upload APIs so we dispatch on the resolved + # path type. + from huggingface_hub.hf_file_system import HfFileSystemResolvedBucketPath + api = file_obj.fs._api - def _upload() -> None: - api.upload_file( - path_or_fileobj=file_obj.temp_file.name, - path_in_repo=file_obj.resolved_path.path_in_repo, - repo_id=file_obj.resolved_path.repo_id, - token=file_obj.fs.token, - repo_type=file_obj.resolved_path.repo_type, - revision=file_obj.resolved_path.revision, - ) + if isinstance(file_obj.resolved_path, HfFileSystemResolvedBucketPath): + + def _upload() -> None: + api.batch_bucket_files( + file_obj.resolved_path.bucket_id, + add=[(file_obj.temp_file.name, file_obj.resolved_path.path)], + ) + else: + + def _upload() -> None: + api.upload_file( + path_or_fileobj=file_obj.temp_file.name, + path_in_repo=file_obj.resolved_path.path_in_repo, + repo_id=file_obj.resolved_path.repo_id, + token=file_obj.fs.token, + repo_type=file_obj.resolved_path.repo_type, + revision=file_obj.resolved_path.revision, + ) self._retry_hf_hub_operation( operation_name="upload", diff --git a/src/datatrove/pipeline/writers/huggingface.py b/src/datatrove/pipeline/writers/huggingface.py index 76d9ec7e5..b3e06dc6d 100644 --- a/src/datatrove/pipeline/writers/huggingface.py +++ b/src/datatrove/pipeline/writers/huggingface.py @@ -6,8 +6,11 @@ from huggingface_hub import ( CommitOperationAdd, + batch_bucket_files, + create_bucket, create_commit, create_repo, + list_bucket_tree, preupload_lfs_files, ) from huggingface_hub.utils import HfHubHTTPError @@ -136,3 +139,127 @@ def _on_file_switch(self, original_name, old_filename, new_filename): """ super()._on_file_switch(original_name, old_filename, new_filename) self.upload_files(old_filename) + + +class HuggingFaceBucketWriter(ParquetWriter): + """High-level writer for Hugging Face storage buckets. + + Buckets are mutable, S3-like object storage backed by Xet. Unlike the + Git-based ``HuggingFaceDatasetWriter``, there are no commits or revisions: + files are uploaded directly via ``batch_bucket_files``. This writer stages + output locally and then pushes completed files to the bucket on rotation + (``max_file_size``) and on ``close()``. + + Use this writer for very large raw / intermediate datasets. For published + datasets, use ``HuggingFaceDatasetWriter`` instead. + """ + + default_output_filename: str = "data/${rank}.parquet" + name = "🪣 HuggingFace Bucket" + + def __init__( + self, + bucket: str, + prefix: str = "", + private: bool = True, + overwrite: bool = False, + local_working_dir: DataFolderLike | None = None, + output_filename: str = None, + compression: Literal["snappy", "gzip", "brotli", "lz4", "zstd"] | None = "snappy", + adapter: Callable = None, + cleanup: bool = True, + expand_metadata: bool = True, + max_file_size: int = round(4.5 * 2**30), # 4.5GB, leave some room for the last batch + schema: Any = None, + save_media_bytes: bool = False, + ): + """ + Args: + bucket: bucket id, e.g. ``"myorg/my-bucket"`` or ``"my-bucket"``. + prefix: optional path prefix inside the bucket (no leading or trailing ``/``). + private: whether to create the bucket as private if it does not exist yet. + overwrite: if ``True``, delete all existing files under ``prefix`` before the + first upload. Only deletes once per writer instance (safe for multi-file + pipelines). Default is ``False`` (append). + local_working_dir: where files are staged before upload. Must be local. + A ``TemporaryDirectory`` is used when not provided. + output_filename: filename template, may contain ``${rank}`` and metadata tags. + compression: parquet compression codec. + adapter: optional adapter function ``(self, document) -> dict``. + cleanup: delete each local staging file after successful upload. + expand_metadata: store each metadata key in its own column. + max_file_size: rotate to a new file when this size (bytes) is exceeded; -1 disables. + schema: optional pyarrow schema. + save_media_bytes: include raw media bytes in the parquet output. + """ + self.bucket = bucket + self.prefix = prefix.strip("/") + self.private = private + self.overwrite = overwrite + # Hold the TemporaryDirectory so it survives until ``self`` is garbage-collected. + self._local_working_tmpdir = tempfile.TemporaryDirectory() if local_working_dir is None else None + self.local_working_dir = get_datafolder( + local_working_dir if local_working_dir else self._local_working_tmpdir.name + ) + self.cleanup = cleanup + if not self.local_working_dir.is_local(): + raise ValueError("local_working_dir must be a local path") + if os.environ.get("HF_HUB_ENABLE_HF_TRANSFER", "0") == "1": + logger.warning( + "You should now use xet for uploads.\nSee https://hf.co/docs/huggingface_hub/en/guides/download#faster-downloads\nexport HF_HUB_ENABLE_HF_TRANSFER=0" + ) + super().__init__( + output_folder=self.local_working_dir, + output_filename=output_filename, + compression=compression, + adapter=adapter, + expand_metadata=expand_metadata, + max_file_size=max_file_size, + schema=schema, + save_media_bytes=save_media_bytes, + ) + self._bucket_init = False + self._overwrite_done = False + + def _remote_path(self, filename: str) -> str: + """Compute the in-bucket path for a staging filename.""" + return f"{self.prefix}/{filename}" if self.prefix else filename + + def _delete_existing_files(self) -> None: + """Delete all existing files under ``self.prefix`` in the bucket (overwrite mode).""" + existing = [ + entry.path + for entry in list_bucket_tree(self.bucket, prefix=self.prefix or None, recursive=True) + if getattr(entry, "type", None) == "file" + ] + if existing: + logger.info(f"Overwrite mode: deleting {len(existing)} existing files at {self.prefix!r}") + batch_bucket_files(self.bucket, delete=existing) + + def upload_files(self, *filenames: str) -> None: + """Upload one or more staged files to the bucket via Xet, then optionally clean them up.""" + if not self._bucket_init: + create_bucket(self.bucket, private=self.private, exist_ok=True) + self._bucket_init = True + if self.overwrite and not self._overwrite_done: + self._delete_existing_files() + self._overwrite_done = True + add = [(self.local_working_dir.resolve_paths(filename), self._remote_path(filename)) for filename in filenames] + logger.info(f"Uploading {','.join(filenames)} to bucket {self.bucket}...") + batch_bucket_files(self.bucket, add=add) + logger.info(f"Upload of {','.join(filenames)} to bucket {self.bucket} complete!") + if self.cleanup: + for filename in filenames: + self.local_working_dir.rm(filename) + + def close(self, rank: int = 0) -> None: + filelist = list(self.output_mg.get_open_files().keys()) + super().close() + if filelist: + logger.info(f"Starting upload of {len(filelist)} files to bucket {self.bucket}") + self.upload_files(*filelist) + + def _on_file_switch(self, original_name, old_filename, new_filename): + """When ``max_file_size`` triggers a rotation, upload the completed file immediately.""" + super()._on_file_switch(original_name, old_filename, new_filename) + self.upload_files(old_filename) diff --git a/tests/manual/README.md b/tests/manual/README.md new file mode 100644 index 000000000..44205ec78 --- /dev/null +++ b/tests/manual/README.md @@ -0,0 +1,16 @@ +# Manual tests + +Scripts in this folder hit the real Hugging Face Hub (and optionally Slurm) and +are therefore **not** run by `pytest` in CI. + +Common requirements: + +- `HF_TOKEN` environment variable set to a token with bucket-create / bucket-write + scope for the org you target. +- Override the bucket id with `--bucket myorg/test-bucket` (default uses + `$USER` and a UUID suffix). +- Slurm scripts additionally require a usable Slurm cluster and the right + `--partition` / `--qos` flags. + +Each script prints a clear `PASS` / `FAIL` line at the end and cleans up the +test bucket on success. diff --git a/tests/manual/__init__.py b/tests/manual/__init__.py new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/tests/manual/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/manual/test_bucket_logging_dir.py b/tests/manual/test_bucket_logging_dir.py new file mode 100644 index 000000000..5fe72de5d --- /dev/null +++ b/tests/manual/test_bucket_logging_dir.py @@ -0,0 +1,87 @@ +"""Live test for using ``hf://buckets/...`` as ``logging_dir``. + +Run manually: + + python tests/manual/test_bucket_logging_dir.py --bucket myorg/test-datatrove-logs + +Verifies that ``LocalPipelineExecutor`` can stream completion files, stats and +``stats.json`` to a bucket-backed ``logging_dir``, and that ``skip_completed`` +correctly short-circuits a re-run. +""" + +from __future__ import annotations + +import argparse +import getpass +import sys +import tempfile +import uuid + +from huggingface_hub import HfFileSystem, create_bucket, delete_bucket + +from datatrove.data import Document +from datatrove.executor.local import LocalPipelineExecutor +from datatrove.pipeline.filters import LambdaFilter +from datatrove.pipeline.writers.jsonl import JsonlWriter + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--bucket", + default=f"{getpass.getuser()}/test-datatrove-{uuid.uuid4().hex[:8]}", + ) + parser.add_argument("--tasks", type=int, default=4) + parser.add_argument("--workers", type=int, default=2) + args = parser.parse_args() + + # Bucket must exist before using it as logging_dir. + create_bucket(args.bucket, private=True, exist_ok=True) + + logging_url = f"hf://buckets/{args.bucket}/logs/run1" + print(f"Running pipeline with logging_dir={logging_url}") + + docs = [Document(text=f"hello {i}", id=f"doc-{i}") for i in range(50)] + + with tempfile.TemporaryDirectory() as out_dir: + executor = LocalPipelineExecutor( + pipeline=[ + docs, + LambdaFilter(lambda doc: True), + JsonlWriter(output_folder=out_dir), + ], + tasks=args.tasks, + workers=args.workers, + logging_dir=logging_url, + ) + executor.run() + + # Confirm completions and stats made it to the bucket. + fs = HfFileSystem() + completions = fs.find(f"buckets/{args.bucket}/logs/run1/completions", maxdepth=2) + stats = fs.find(f"buckets/{args.bucket}/logs/run1/stats", maxdepth=2) + # stats/ contains per-task files plus a stats.json aggregate + if len(completions) != args.tasks or len(stats) < args.tasks: + print( + f"FAIL: expected {args.tasks} completion files and >= {args.tasks} stats files; " + f"got {len(completions)} completions, {len(stats)} stats" + ) + return 1 + + # Re-running with skip_completed=True should be a no-op. + executor2 = LocalPipelineExecutor( + pipeline=[docs, JsonlWriter(output_folder=out_dir)], + tasks=args.tasks, + workers=args.workers, + logging_dir=logging_url, + skip_completed=True, + ) + executor2.run() + + delete_bucket(args.bucket, missing_ok=True) + print(f"PASS: bucket-backed logging_dir works for {args.tasks} tasks") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/manual/test_bucket_slurm_pipeline.py b/tests/manual/test_bucket_slurm_pipeline.py new file mode 100644 index 000000000..a1ee720a6 --- /dev/null +++ b/tests/manual/test_bucket_slurm_pipeline.py @@ -0,0 +1,82 @@ +"""Live Slurm test: write parquet shards to a bucket, then read them back. + +Run manually from a login node (do **not** submit from inside a Slurm task): + + python tests/manual/test_bucket_slurm_pipeline.py \ + --bucket myorg/test-datatrove-slurm \ + --partition + +Verifies that ``SlurmPipelineExecutor`` can write parquet shards to a bucket +via ``ParquetWriter(output_folder="hf://buckets/...")`` and that all shards +appear under the expected prefix. +""" + +from __future__ import annotations + +import argparse +import getpass +import sys +import uuid + +from huggingface_hub import HfFileSystem, delete_bucket + +from datatrove.data import Document +from datatrove.executor.slurm import SlurmPipelineExecutor +from datatrove.pipeline.readers.parquet import ParquetReader +from datatrove.pipeline.writers.parquet import ParquetWriter + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--bucket", + default=f"{getpass.getuser()}/test-datatrove-{uuid.uuid4().hex[:8]}", + ) + parser.add_argument("--partition", required=True) + parser.add_argument("--qos", default="normal") + parser.add_argument("--tasks", type=int, default=8) + parser.add_argument("--workers", type=int, default=4) + parser.add_argument("--n_docs_per_task", type=int, default=10) + args = parser.parse_args() + + docs = [Document(text=f"hello {i}", id=f"doc-{i}") for i in range(args.tasks * args.n_docs_per_task)] + output_url = f"hf://buckets/{args.bucket}/v1/parquet" + logs_url = f"hf://buckets/{args.bucket}/logs/v1" + + print(f"Submitting Slurm job (tasks={args.tasks}, workers={args.workers})") + executor = SlurmPipelineExecutor( + job_name="datatrove-bucket-test", + pipeline=[ + docs, + ParquetWriter(output_folder=output_url), + ], + tasks=args.tasks, + workers=args.workers, + time="00:20:00", + partition=args.partition, + qos=args.qos, + logging_dir=logs_url, + cpus_per_task=2, + mem_per_cpu_gb=2, + ) + executor.run() + + fs = HfFileSystem() + shards = fs.find(f"buckets/{args.bucket}/v1/parquet", maxdepth=2) + if len(shards) != args.tasks: + print(f"FAIL: expected {args.tasks} parquet shards in bucket, got {len(shards)}") + return 1 + + reader = ParquetReader(data_folder=output_url) + read_back = list(reader()) + if len(read_back) != len(docs): + print(f"FAIL: read {len(read_back)} docs (expected {len(docs)})") + return 1 + + delete_bucket(args.bucket, missing_ok=True) + print(f"PASS: Slurm pipeline wrote {len(docs)} docs across {args.tasks} shards to {args.bucket}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/manual/test_bucket_writer_live.py b/tests/manual/test_bucket_writer_live.py new file mode 100644 index 000000000..cf381d4d6 --- /dev/null +++ b/tests/manual/test_bucket_writer_live.py @@ -0,0 +1,68 @@ +"""Live end-to-end test for ``HuggingFaceBucketWriter``. + +Run manually: + + python tests/manual/test_bucket_writer_live.py --bucket myorg/test-datatrove-bucket + +Requires ``HF_TOKEN`` with bucket-write scope. The bucket is created if missing +and deleted on success. +""" + +from __future__ import annotations + +import argparse +import getpass +import sys +import uuid + +from huggingface_hub import delete_bucket + +from datatrove.data import Document +from datatrove.pipeline.readers.parquet import ParquetReader +from datatrove.pipeline.writers import HuggingFaceBucketWriter + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--bucket", + default=f"{getpass.getuser()}/test-datatrove-{uuid.uuid4().hex[:8]}", + help="Bucket id to use for the test (will be auto-created).", + ) + parser.add_argument("--n_docs", type=int, default=100) + parser.add_argument("--prefix", default="round-trip") + args = parser.parse_args() + + docs = [Document(text=f"hello world {i}", id=f"doc-{i}", metadata={"score": float(i)}) for i in range(args.n_docs)] + + print(f"Writing {args.n_docs} docs to bucket={args.bucket} prefix={args.prefix}") + writer = HuggingFaceBucketWriter( + bucket=args.bucket, + prefix=args.prefix, + private=True, + cleanup=True, + ) + with writer: + for doc in docs: + writer.write(doc) + + bucket_url = f"hf://buckets/{args.bucket}/{args.prefix}/" + print(f"Reading back from {bucket_url}") + reader = ParquetReader(data_folder=bucket_url) + read_back = list(reader()) + + ok = len(read_back) == args.n_docs and all( + rd.text == orig.text and rd.id == orig.id for rd, orig in zip(read_back, docs) + ) + + if ok: + delete_bucket(args.bucket, missing_ok=True) + print(f"PASS: round-trip of {args.n_docs} docs through bucket {args.bucket}") + return 0 + + print(f"FAIL: read back {len(read_back)} docs (expected {args.n_docs})") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/pipeline/writers/test_disk_writer.py b/tests/pipeline/writers/test_disk_writer.py index 3669e6377..002491473 100644 --- a/tests/pipeline/writers/test_disk_writer.py +++ b/tests/pipeline/writers/test_disk_writer.py @@ -102,6 +102,38 @@ def close(self) -> None: raise RuntimeError(self._close_error_message) +class _FlakyBucketApi: + """Fake HfApi that records `batch_bucket_files` calls and fails the first N times.""" + + def __init__(self, failures: int, error_message: str) -> None: + self.failures = failures + self.error_message = error_message + self.calls: list[dict[str, Any]] = [] + + def batch_bucket_files(self, bucket_id: str, *, add=None, **_kwargs: Any) -> None: + self.calls.append({"bucket_id": bucket_id, "add": add}) + if len(self.calls) <= self.failures: + raise RuntimeError(self.error_message) + + +class _FailingCloseBucketFile: + """Fake fsspec output file whose `resolved_path` looks like an HfFileSystemResolvedBucketPath.""" + + def __init__(self, upload_api: _FlakyBucketApi, close_error_message: str) -> None: + from huggingface_hub.hf_file_system import HfFileSystemResolvedBucketPath + + self.fs = SimpleNamespace(_api=upload_api, token="fake-token") + self.temp_file = SimpleNamespace(name="/tmp/fake-bucket-upload.tmp") + self.resolved_path = HfFileSystemResolvedBucketPath( + bucket_id="org/my-bucket", + path="data/00000.parquet", + ) + self._close_error_message = close_error_message + + def close(self) -> None: + raise RuntimeError(self._close_error_message) + + class _CloseFileOutputManager: def __init__(self, file_obj: _FailingCloseFile) -> None: self._file_obj = file_obj @@ -231,6 +263,79 @@ def test_close_file_retries_transient_hf_upload_errors(self) -> None: self.assertEqual(upload_api.calls, 3) + def test_close_file_bucket_retry_on_transient_error(self) -> None: + """Bucket-backed files must retry uploads via batch_bucket_files (not upload_file).""" + writer = _TextTestDiskWriter(output_folder=self.tmp_dir) + upload_api = _FlakyBucketApi(failures=2, error_message="A commit has happened since") + file_obj = _FailingCloseBucketFile( + upload_api=upload_api, + close_error_message="A commit has happened since", + ) + writer.output_mg = _CloseFileOutputManager(file_obj=file_obj) + + with ( + patch("datatrove.pipeline.writers.disk_base.time.sleep", return_value=None), + patch("datatrove.pipeline.writers.disk_base.os.path.exists", return_value=False), + patch("datatrove.pipeline.writers.disk_base.os.remove", return_value=None), + ): + writer.close_file("data/00000.parquet") + + self.assertEqual(len(upload_api.calls), 3) + last_call = upload_api.calls[-1] + self.assertEqual(last_call["bucket_id"], "org/my-bucket") + self.assertEqual(last_call["add"], [(file_obj.temp_file.name, "data/00000.parquet")]) + + def test_close_file_bucket_non_retryable_error_raises(self) -> None: + writer = _TextTestDiskWriter(output_folder=self.tmp_dir) + upload_api = _FlakyBucketApi(failures=0, error_message="should not be called") + file_obj = _FailingCloseBucketFile( + upload_api=upload_api, + close_error_message="Permission denied", + ) + writer.output_mg = _CloseFileOutputManager(file_obj=file_obj) + + with patch("datatrove.pipeline.writers.disk_base.time.sleep", return_value=None): + with self.assertRaisesRegex(RuntimeError, "Permission denied"): + writer.close_file("data/00000.parquet") + self.assertEqual(upload_api.calls, []) + + def test_close_file_bucket_max_retries_exhausted(self) -> None: + writer = _TextTestDiskWriter(output_folder=self.tmp_dir) + # Always-failing API exceeds HF_MAX_RETRIES. + upload_api = _FlakyBucketApi( + failures=writer.HF_MAX_RETRIES + 1, + error_message="Too Many Requests", + ) + file_obj = _FailingCloseBucketFile( + upload_api=upload_api, + close_error_message="Too Many Requests", + ) + writer.output_mg = _CloseFileOutputManager(file_obj=file_obj) + + with patch("datatrove.pipeline.writers.disk_base.time.sleep", return_value=None): + with self.assertRaisesRegex(RuntimeError, "Too Many Requests"): + writer.close_file("data/00000.parquet") + self.assertEqual(len(upload_api.calls), writer.HF_MAX_RETRIES) + + def test_close_file_repo_path_unchanged(self) -> None: + """Regression test: existing repo-path retry path must still call upload_file.""" + writer = _TextTestDiskWriter(output_folder=self.tmp_dir) + upload_api = _FlakyUploadApi(failures=1, error_message="rate limit") + file_obj = _FailingCloseFile( + upload_api=upload_api, + close_error_message="rate limit", + ) + writer.output_mg = _CloseFileOutputManager(file_obj=file_obj) + + with ( + patch("datatrove.pipeline.writers.disk_base.time.sleep", return_value=None), + patch("datatrove.pipeline.writers.disk_base.os.path.exists", return_value=False), + patch("datatrove.pipeline.writers.disk_base.os.remove", return_value=None), + ): + writer.close_file("00000.txt") + + self.assertEqual(upload_api.calls, 2) + def test_huggingface_close_retries_on_503_without_server_message(self) -> None: writer = object.__new__(HuggingFaceDatasetWriter) writer.output_mg = SimpleNamespace(get_open_files=lambda: {}) diff --git a/tests/pipeline/writers/test_huggingface_bucket.py b/tests/pipeline/writers/test_huggingface_bucket.py new file mode 100644 index 000000000..615a58ec2 --- /dev/null +++ b/tests/pipeline/writers/test_huggingface_bucket.py @@ -0,0 +1,277 @@ +"""Tests for HuggingFaceBucketWriter. + +Unit tests use ``unittest.mock.patch`` on the underlying ``huggingface_hub`` APIs +(``create_bucket`` / ``batch_bucket_files``) so they run without network access. +Integration tests write to a local staging directory through the writer and read +the resulting parquet files back via ``ParquetReader``. +""" + +import os +import shutil +import tempfile +import unittest +from typing import Any +from unittest.mock import call, patch + +from datatrove.data import Document +from datatrove.pipeline.readers.parquet import ParquetReader +from datatrove.pipeline.writers.huggingface import HuggingFaceBucketWriter + +from ...utils import require_pyarrow + + +def _make_docs(n: int) -> list[Document]: + return [ + Document( + text=f"hello {i}", + id=f"doc-{i}", + metadata={"split": "train", "score": float(i)}, + ) + for i in range(n) + ] + + +@require_pyarrow +class TestHuggingFaceBucketWriterUnit(unittest.TestCase): + """Unit tests with mocked Hub APIs and real local staging.""" + + def setUp(self) -> None: + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self._create_bucket_patcher = patch( + "datatrove.pipeline.writers.huggingface.create_bucket", + return_value=None, + ) + self._batch_patcher = patch( + "datatrove.pipeline.writers.huggingface.batch_bucket_files", + return_value=None, + ) + self.mock_create_bucket = self._create_bucket_patcher.start() + self.mock_batch_bucket_files = self._batch_patcher.start() + self.addCleanup(self._create_bucket_patcher.stop) + self.addCleanup(self._batch_patcher.stop) + + def _make_writer(self, **kwargs: Any) -> HuggingFaceBucketWriter: + defaults: dict[str, Any] = { + "bucket": "org/my-bucket", + "local_working_dir": self.tmp_dir, + } + defaults.update(kwargs) + return HuggingFaceBucketWriter(**defaults) + + def test_init_creates_temp_local_working_dir_when_none(self) -> None: + writer = HuggingFaceBucketWriter(bucket="org/my-bucket") + self.assertTrue(writer.local_working_dir.is_local()) + + def test_init_rejects_non_local_working_dir(self) -> None: + with self.assertRaisesRegex(ValueError, "local"): + HuggingFaceBucketWriter( + bucket="org/my-bucket", + local_working_dir="s3://some-bucket/staging/", + ) + + def test_upload_files_creates_bucket_on_first_call_only(self) -> None: + writer = self._make_writer() + # create the local files so cleanup does not fail + for name in ("a.parquet", "b.parquet"): + with writer.local_working_dir.open(name, "wb") as f: + f.write(b"x") + writer.upload_files("a.parquet") + writer.upload_files("b.parquet") + self.assertEqual(self.mock_create_bucket.call_count, 1) + self.mock_create_bucket.assert_called_with("org/my-bucket", private=True, exist_ok=True) + + def test_upload_files_calls_batch_bucket_files_with_resolved_paths(self) -> None: + writer = self._make_writer(prefix="v1") + with writer.local_working_dir.open("file.parquet", "wb") as f: + f.write(b"x") + writer.upload_files("file.parquet") + + self.assertEqual(self.mock_batch_bucket_files.call_count, 1) + args, kwargs = self.mock_batch_bucket_files.call_args + self.assertEqual(args[0], "org/my-bucket") + add = kwargs["add"] + self.assertEqual(len(add), 1) + local_path, remote_path = add[0] + self.assertEqual(local_path, os.path.join(self.tmp_dir, "file.parquet")) + self.assertEqual(remote_path, "v1/file.parquet") + + def test_upload_files_with_empty_prefix_has_no_leading_slash(self) -> None: + writer = self._make_writer(prefix="") + with writer.local_working_dir.open("file.parquet", "wb") as f: + f.write(b"x") + writer.upload_files("file.parquet") + + _, kwargs = self.mock_batch_bucket_files.call_args + _, remote_path = kwargs["add"][0] + self.assertEqual(remote_path, "file.parquet") + + def test_upload_files_cleanup_true_removes_local_files(self) -> None: + writer = self._make_writer(cleanup=True) + path = os.path.join(self.tmp_dir, "x.parquet") + with writer.local_working_dir.open("x.parquet", "wb") as f: + f.write(b"x") + self.assertTrue(os.path.exists(path)) + + writer.upload_files("x.parquet") + self.assertFalse(os.path.exists(path)) + + def test_upload_files_cleanup_false_keeps_local_files(self) -> None: + writer = self._make_writer(cleanup=False) + path = os.path.join(self.tmp_dir, "x.parquet") + with writer.local_working_dir.open("x.parquet", "wb") as f: + f.write(b"x") + + writer.upload_files("x.parquet") + self.assertTrue(os.path.exists(path)) + + def test_close_uploads_remaining_open_files(self) -> None: + writer = self._make_writer() + for doc in _make_docs(3): + writer.write(doc) + with patch.object(writer, "upload_files", wraps=writer.upload_files) as mock_upload: + writer.close() + # close() must trigger at least one upload of the open file(s). + self.assertGreaterEqual(mock_upload.call_count, 1) + + def test_close_does_not_create_commit(self) -> None: + """Buckets are commit-less: close() must not call ``create_commit``.""" + writer = self._make_writer() + for doc in _make_docs(3): + writer.write(doc) + with patch("datatrove.pipeline.writers.huggingface.create_commit") as mock_commit: + writer.close() + mock_commit.assert_not_called() + + def test_on_file_switch_uploads_old_file(self) -> None: + """``_on_file_switch`` (called on rotation) must immediately upload the completed file.""" + writer = self._make_writer() + with patch.object(writer, "upload_files") as mock_upload: + writer._on_file_switch("data/00000.parquet", "data/000_00000.parquet", "data/001_00000.parquet") + mock_upload.assert_called_once_with("data/000_00000.parquet") + + def test_full_write_cycle_triggers_uploads_and_stats(self) -> None: + writer = self._make_writer() + docs = _make_docs(5) + for doc in docs: + writer.write(doc) + writer.close() + + # bucket auto-created exactly once. + self.assertEqual(self.mock_create_bucket.call_count, 1) + # at least one upload happened. + self.assertGreaterEqual(self.mock_batch_bucket_files.call_count, 1) + # stats reflect the writes. + self.assertEqual(int(writer.stats["total"].total), len(docs)) + + # --- overwrite mode --- + + def test_overwrite_deletes_existing_files_on_first_upload(self) -> None: + """With overwrite=True, existing files at the prefix are deleted before the first upload.""" + fake_existing = [ + _make_fake_bucket_file("v1/data/old_000.parquet"), + _make_fake_bucket_file("v1/data/old_001.parquet"), + ] + with patch( + "datatrove.pipeline.writers.huggingface.list_bucket_tree", + return_value=iter(fake_existing), + ) as mock_list: + writer = self._make_writer(prefix="v1/data", overwrite=True) + with writer.local_working_dir.open("file.parquet", "wb") as f: + f.write(b"x") + writer.upload_files("file.parquet") + + # list_bucket_tree called once to discover existing files. + mock_list.assert_called_once_with("org/my-bucket", prefix="v1/data", recursive=True) + # First batch_bucket_files call deletes the old files. + delete_call = self.mock_batch_bucket_files.call_args_list[0] + self.assertEqual( + delete_call, call("org/my-bucket", delete=["v1/data/old_000.parquet", "v1/data/old_001.parquet"]) + ) + # Second call is the actual upload. + upload_call = self.mock_batch_bucket_files.call_args_list[1] + self.assertIn("add", upload_call.kwargs) + + def test_overwrite_deletes_only_once(self) -> None: + """The delete-before-upload step must happen exactly once, not on every upload_files call.""" + with patch( + "datatrove.pipeline.writers.huggingface.list_bucket_tree", + return_value=iter([_make_fake_bucket_file("v1/old.parquet")]), + ) as mock_list: + writer = self._make_writer(prefix="v1", overwrite=True) + for name in ("a.parquet", "b.parquet"): + with writer.local_working_dir.open(name, "wb") as f: + f.write(b"x") + writer.upload_files("a.parquet") + writer.upload_files("b.parquet") + + mock_list.assert_called_once() + + def test_overwrite_skips_delete_when_no_existing_files(self) -> None: + """No delete call when there are no existing files at the prefix.""" + with patch( + "datatrove.pipeline.writers.huggingface.list_bucket_tree", + return_value=iter([]), + ): + writer = self._make_writer(prefix="v1", overwrite=True) + with writer.local_working_dir.open("file.parquet", "wb") as f: + f.write(b"x") + writer.upload_files("file.parquet") + + # Only one batch call: the upload. No delete call. + self.assertEqual(self.mock_batch_bucket_files.call_count, 1) + _, kwargs = self.mock_batch_bucket_files.call_args + self.assertIn("add", kwargs) + + def test_overwrite_false_by_default(self) -> None: + """Default behaviour is append — no list/delete happens.""" + writer = self._make_writer() + with writer.local_working_dir.open("file.parquet", "wb") as f: + f.write(b"x") + with patch("datatrove.pipeline.writers.huggingface.list_bucket_tree") as mock_list: + writer.upload_files("file.parquet") + mock_list.assert_not_called() + + +def _make_fake_bucket_file(path: str) -> Any: + """Return a minimal object that behaves like ``huggingface_hub.BucketFile``.""" + from types import SimpleNamespace + + return SimpleNamespace(type="file", path=path, size=100) + + +@require_pyarrow +class TestHuggingFaceBucketWriterIntegration(unittest.TestCase): + """Round-trip integration: write -> stage locally -> read back via ParquetReader.""" + + def setUp(self) -> None: + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + + def test_parquet_round_trip_via_bucket_writer(self) -> None: + # cleanup=False so files stay around for ParquetReader to pick up. + with ( + patch("datatrove.pipeline.writers.huggingface.create_bucket"), + patch("datatrove.pipeline.writers.huggingface.batch_bucket_files"), + ): + writer = HuggingFaceBucketWriter( + bucket="org/my-bucket", + local_working_dir=self.tmp_dir, + cleanup=False, + ) + originals = _make_docs(7) + with writer: + for doc in originals: + writer.write(doc) + + reader = ParquetReader(self.tmp_dir) + read_back = list(reader()) + self.assertEqual(len(read_back), len(originals)) + for read_doc, original in zip(read_back, originals): + read_doc.metadata.pop("file_path", None) + self.assertEqual(read_doc.text, original.text) + self.assertEqual(read_doc.id, original.id) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_io.py b/tests/test_io.py index 69685f622..368490567 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -7,11 +7,13 @@ import unittest from functools import partial from pathlib import Path +from unittest.mock import patch import boto3 import moto +from huggingface_hub import HfFileSystem -from datatrove.io import get_datafolder, get_shard_from_paths_file, safely_create_file +from datatrove.io import DataFolder, get_datafolder, get_shard_from_paths_file, safely_create_file EXAMPLE_DIRS = ("/home/testuser/somedir", "file:///home/testuser2/somedir", "s3://test-bucket/somedir") @@ -120,5 +122,59 @@ def test_get_shard_from_paths_file_sharding(self): self.assertEqual(set(shard_0) | set(shard_1), set(test_paths)) +class TestIOBuckets(unittest.TestCase): + """Tests for DataFolder when used with hf://buckets/ paths. + + These exercise the integration with HfFileSystem (fsspec) without hitting + the network: HfFileSystem methods are mocked at the instance level. + """ + + BUCKET_URL = "hf://buckets/myorg/my-bucket/data" + + def test_get_datafolder_bucket_url_uses_hf_filesystem(self) -> None: + df = get_datafolder(self.BUCKET_URL) + self.assertIsInstance(df, DataFolder) + self.assertIsInstance(df.fs, HfFileSystem) + self.assertFalse(df.is_local()) + + def test_resolve_paths_bucket(self) -> None: + df = get_datafolder(self.BUCKET_URL) + resolved = df.resolve_paths("file.parquet") + self.assertEqual(resolved, "hf://buckets/myorg/my-bucket/data/file.parquet") + + def test_list_files_bucket_uses_expand_info_false(self) -> None: + df = get_datafolder(self.BUCKET_URL) + # ``find`` is the underlying fsspec call used by ``list_files``. + fake_listing = { + "buckets/myorg/my-bucket/data/a.parquet": {"type": "file"}, + "buckets/myorg/my-bucket/data/sub/b.parquet": {"type": "file"}, + } + with patch.object(df, "find", return_value=fake_listing) as mock_find: + files = df.list_files() + # Files come back sorted with the directory prefix stripped. + self.assertEqual( + files, + [ + "buckets/myorg/my-bucket/data/a.parquet", + "buckets/myorg/my-bucket/data/sub/b.parquet", + ], + ) + # The HF speed-up (expand_info=False) must be passed through. + _, kwargs = mock_find.call_args + self.assertEqual(kwargs.get("expand_info"), False) + self.assertEqual(kwargs.get("detail"), True) + + def test_get_shard_bucket(self) -> None: + df = get_datafolder(self.BUCKET_URL) + files = [f"file_{i}.parquet" for i in range(6)] + with patch.object(df, "list_files", return_value=files): + shard_0 = df.get_shard(0, 3) + shard_1 = df.get_shard(1, 3) + shard_2 = df.get_shard(2, 3) + self.assertEqual(shard_0, ["file_0.parquet", "file_3.parquet"]) + self.assertEqual(shard_1, ["file_1.parquet", "file_4.parquet"]) + self.assertEqual(shard_2, ["file_2.parquet", "file_5.parquet"]) + + if __name__ == "__main__": unittest.main()