diff --git a/gcsfs/tests/conftest.py b/gcsfs/tests/conftest.py index fb4ef565..4c512b53 100644 --- a/gcsfs/tests/conftest.py +++ b/gcsfs/tests/conftest.py @@ -643,6 +643,7 @@ def pytest_ignore_collect(collection_path, config): "pipe", "open", "glob", + "put", } # If only --run-benchmarks-infra is passed, ignore the actual benchmark subfolders. diff --git a/gcsfs/tests/perf/microbenchmarks/README.md b/gcsfs/tests/perf/microbenchmarks/README.md index 7dd1d61c..c60ecfaa 100644 --- a/gcsfs/tests/perf/microbenchmarks/README.md +++ b/gcsfs/tests/perf/microbenchmarks/README.md @@ -2,7 +2,7 @@ ## Introduction -GCSFS microbenchmarks are a suite of performance tests designed to evaluate the efficiency and latency of various Google Cloud Storage file system operations, including read, write, listing, delete, rename, open, and glob. +GCSFS microbenchmarks are a suite of performance tests designed to evaluate the efficiency and latency of various Google Cloud Storage file system operations, including read, write, put, listing, delete, rename, open, and glob. These benchmarks are built using the `pytest` and `pytest-benchmark` frameworks. Each benchmark test is a parameterized pytest case, where the parameters are dynamically configured at runtime from YAML configuration files. This allows for flexible and extensive testing scenarios without modifying the code. @@ -80,7 +80,7 @@ The `run.py` script is the central entry point for executing benchmarks. It hand | Option | Description | Required | | :--- | :--- | :--- | -| `--group` | The benchmark group to run (e.g., `read`, `write`, `listing`, `info`, `open`, `glob`). Runs all groups if not specified. | No | +| `--group` | The benchmark group to run (e.g., `read`, `write`, `put`, `listing`, `info`, `open`, `glob`). Runs all groups if not specified. | No | | `--config` | Specific scenario names to run (e.g., `read_seq`, `list_flat`). Accepts multiple values. | No | | `--regional-bucket` | Name of the regional GCS bucket. | Yes* | | `--zonal-bucket` | Name of the zonal GCS bucket. | Yes* | diff --git a/gcsfs/tests/perf/microbenchmarks/conftest.py b/gcsfs/tests/perf/microbenchmarks/conftest.py index 6e8e6a11..4a5e2d35 100644 --- a/gcsfs/tests/perf/microbenchmarks/conftest.py +++ b/gcsfs/tests/perf/microbenchmarks/conftest.py @@ -1,7 +1,9 @@ import logging import multiprocessing import os +import shutil import statistics +import tempfile import time import uuid from typing import Any, List @@ -35,14 +37,29 @@ def populate_bucket(): return False +def _random_chunks(total_size, max_chunk=100 * MB): + """Yield byte chunks summing to ``total_size``, each at most ``max_chunk``. + + A single 1 MiB random block is generated once and repeated to fill each + chunk. ``os.urandom`` is CPU-bound (~200 MB/s) and otherwise dominates the + setup time for multi-GB files; repeating a block is several times faster + and still produces uncompressible bytes for the network/storage layer (GCS + does not compress object uploads), so measured throughput is unaffected. + """ + block = os.urandom(min(1 * MB, total_size)) + block_len = len(block) + remaining = total_size + while remaining > 0: + write_size = min(max_chunk, remaining) + repeats, remainder = divmod(write_size, block_len) + yield block * repeats + block[:remainder] + remaining -= write_size + + def _write_file(gcs, path, file_size, chunk_size): - chunks_to_write = file_size // chunk_size - remainder = file_size % chunk_size with gcs.open(path, "wb", finalize_on_close=True) as f: - for _ in range(chunks_to_write): - f.write(os.urandom(chunk_size)) - if remainder > 0: - f.write(os.urandom(remainder)) + for chunk in _random_chunks(file_size, chunk_size): + f.write(chunk) actual_size = gcs.info(path)["size"] if actual_size != file_size: @@ -86,6 +103,13 @@ def _prepare_folders(gcs, folder_paths): gcs.mkdir(path, create_parents=True) +def _write_local_file(path, file_size): + """Create a local source file of the given size for put benchmarks.""" + with open(path, "wb") as f: + for chunk in _random_chunks(file_size): + f.write(chunk) + + def _benchmark_io_fixture_helper( extended_gcs_factory, params, prefix_tag, create_files=False, gcs_kwargs=None ): @@ -184,6 +208,75 @@ def gcsfs_benchmark_pipe(extended_gcs_factory, request): ) +def _benchmark_put_fixture_helper(extended_gcs_factory, params, prefix_tag): + gcs = extended_gcs_factory() + + prefix = f"{params.bucket_name}/{prefix_tag}-{uuid.uuid4()}" + file_paths = [f"{prefix}/file_{i}" for i in range(params.files)] + + local_dir = tempfile.mkdtemp(prefix="gcsfs-benchmark-put-") + local_path = os.path.join(local_dir, "source") + + logging.info( + f"Setting up benchmark '{params.name}': creating local source file of " + f"size {params.file_size_bytes / MB:.2f} MB at '{local_path}' and " + f"targeting {params.files} remote destination(s)." + ) + + try: + start_time = time.perf_counter() + _write_local_file(local_path, params.file_size_bytes) + duration_ms = (time.perf_counter() - start_time) * 1000 + logging.info( + f"Benchmark '{params.name}' setup created local source file in {duration_ms:.2f} ms." + ) + + # NOTE: The source file is written immediately before the benchmark and + # is shared by every process/round, so it stays resident in the OS page + # cache. These benchmarks therefore measure upload throughput from a + # cached source (representative of "upload a file you just produced"), + # not from cold disk. + yield gcs, local_path, file_paths, params + + # Verify upload integrity outside the timed region. gcsfs defaults to + # consistency="none" (no client-side checksum on put), so we assert the + # uploaded object sizes match the local source. The cache is invalidated + # first because multi-process uploads run on separate gcs instances. + gcs.invalidate_cache() + for path in file_paths: + actual_size = gcs.info(path)["size"] + if actual_size != params.file_size_bytes: + raise RuntimeError( + f"Upload integrity check failed for {path}. " + f"Expected size: {params.file_size_bytes}, " + f"Actual size: {actual_size}" + ) + + finally: + # --- Teardown --- + logging.info( + f"Tearing down benchmark '{params.name}': deleting remote files and local source." + ) + try: + gcs.rm(prefix, recursive=True) + except Exception as e: + logging.error(f"Failed to clean up benchmark files: {e!r}") + shutil.rmtree(local_dir, ignore_errors=True) + + +@pytest.fixture +def gcsfs_benchmark_put(extended_gcs_factory, request): + """ + A fixture that sets up the environment for a put benchmark run. + It provides a GCSFS instance, a single local source file to upload, and a + list of remote destination paths to upload it to. + """ + params = request.param + yield from _benchmark_put_fixture_helper( + extended_gcs_factory, params, "benchmark-put" + ) + + def _benchmark_listing_fixture_helper( extended_gcs_factory, params, diff --git a/gcsfs/tests/perf/microbenchmarks/put/configs.py b/gcsfs/tests/perf/microbenchmarks/put/configs.py new file mode 100644 index 00000000..416488ca --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/put/configs.py @@ -0,0 +1,55 @@ +import itertools + +from gcsfs.tests.perf.microbenchmarks.configs import BaseBenchmarkConfigurator +from gcsfs.tests.perf.microbenchmarks.conftest import MB + +from .parameters import PutBenchmarkParameters + + +class PutConfigurator(BaseBenchmarkConfigurator): + def build_cases(self, scenario, common_config): + procs_list = scenario.get("processes", [1]) + threads_list = scenario.get("threads", [1]) + bucket_types = common_config.get("bucket_types", ["regional"]) + file_sizes_mb = common_config.get("file_sizes_mb", [200]) + chunk_sizes_mb = common_config.get("chunk_sizes_mb", [50]) + rounds = common_config.get("rounds", 1) + + cases = [] + param_combinations = itertools.product( + procs_list, threads_list, file_sizes_mb, chunk_sizes_mb, bucket_types + ) + + for ( + procs, + threads, + file_size_mb, + chunk_size_mb, + bucket_type, + ) in param_combinations: + bucket_name = self.get_bucket_name(bucket_type) + if not bucket_name: + continue + + name = ( + f"{scenario['name']}_{procs}procs_{threads}threads_" + f"{file_size_mb}MB_file_{chunk_size_mb}MB_chunk_{bucket_type}" + ) + + params = PutBenchmarkParameters( + name=name, + bucket_name=bucket_name, + bucket_type=bucket_type, + threads=threads, + processes=procs, + files=threads * procs, + rounds=rounds, + file_size_bytes=int(file_size_mb * MB), + chunk_size_bytes=int(chunk_size_mb * MB), + ) + cases.append(params) + return cases + + +def get_put_benchmark_cases(): + return PutConfigurator(__file__).generate_cases() diff --git a/gcsfs/tests/perf/microbenchmarks/put/configs.yaml b/gcsfs/tests/perf/microbenchmarks/put/configs.yaml new file mode 100644 index 00000000..01df7ef4 --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/put/configs.yaml @@ -0,0 +1,15 @@ +common: + bucket_types: + - "regional" + - "zonal" + file_sizes_mb: + - 4096 # 4 GB + chunk_sizes_mb: + - 50 # 50 MB Default chunk size + rounds: 3 + +scenarios: + - name: "put_file" + + - name: "put_file_multi_process" + processes: [4, 8] diff --git a/gcsfs/tests/perf/microbenchmarks/put/parameters.py b/gcsfs/tests/perf/microbenchmarks/put/parameters.py new file mode 100644 index 00000000..3875d97f --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/put/parameters.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass + +from gcsfs.tests.perf.microbenchmarks.parameters import IOBenchmarkParameters + + +@dataclass +class PutBenchmarkParameters(IOBenchmarkParameters): + """ + Defines the parameters for a put benchmark test case. + + A put benchmark uploads a local file from disk to GCS, so the relevant + knobs (``file_size_bytes`` for the local source size and + ``chunk_size_bytes`` for the resumable upload chunk size) are already + provided by ``IOBenchmarkParameters``. + """ + + pass diff --git a/gcsfs/tests/perf/microbenchmarks/put/test_put.py b/gcsfs/tests/perf/microbenchmarks/put/test_put.py new file mode 100644 index 00000000..f36e13d5 --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/put/test_put.py @@ -0,0 +1,117 @@ +import logging +import time +from concurrent.futures import ThreadPoolExecutor + +import pytest + +from gcsfs.tests.perf.microbenchmarks.put.configs import get_put_benchmark_cases +from gcsfs.tests.perf.microbenchmarks.runner import ( + filter_test_cases, + run_multi_process, + run_single_threaded, +) + +BENCHMARK_GROUP = "put" + + +def _put_op(gcs, local_path, remote_path, chunk_size): + """Upload a local file to a single remote path.""" + try: + gcs.put(local_path, remote_path, chunksize=chunk_size) + except Exception as e: + logging.error(f"Error putting {local_path} to {remote_path}: {e}") + raise + + +all_benchmark_cases = get_put_benchmark_cases() +single_threaded_cases, _, multi_process_cases = filter_test_cases(all_benchmark_cases) + + +@pytest.mark.parametrize( + "gcsfs_benchmark_put", + single_threaded_cases, + indirect=True, + ids=lambda p: p.name, +) +def test_put_single_threaded(benchmark, gcsfs_benchmark_put, monitor): + gcs, local_path, file_paths, params = gcsfs_benchmark_put + + op_args = ( + gcs, + local_path, + file_paths[0], + params.chunk_size_bytes, + ) + run_single_threaded( + benchmark, + monitor, + params, + _put_op, + op_args, + BENCHMARK_GROUP, + ) + + +def _process_worker( + gcs, + local_path, + file_paths, + chunk_size, + process_durations_shared, + index, +): + """A worker function for each process to upload files concurrently.""" + start_time = time.perf_counter() + + with ThreadPoolExecutor(max_workers=len(file_paths)) as executor: + futures = [ + executor.submit( + _put_op, + gcs, + local_path, + remote_path, + chunk_size, + ) + for remote_path in file_paths + ] + [f.result() for f in futures] + + duration_s = time.perf_counter() - start_time + process_durations_shared[index] = duration_s + + +@pytest.mark.parametrize( + "gcsfs_benchmark_put", + multi_process_cases, + indirect=True, + ids=lambda p: p.name, +) +def test_put_multi_process( + benchmark, gcsfs_benchmark_put, extended_gcs_factory, request, monitor +): + _, local_path, file_paths, params = gcsfs_benchmark_put + files_per_process = params.files // params.processes + + def args_builder(gcs_instance, i, shared_arr): + start_index = i * files_per_process + end_index = start_index + files_per_process + process_files = file_paths[start_index:end_index] + return ( + gcs_instance, + local_path, + process_files, + params.chunk_size_bytes, + shared_arr, + i, + ) + + run_multi_process( + benchmark, + monitor, + params, + extended_gcs_factory, + worker_target=_process_worker, + args_builder=args_builder, + benchmark_group=BENCHMARK_GROUP, + request=request, + ) diff --git a/gcsfs/tests/perf/microbenchmarks/test_configs.py b/gcsfs/tests/perf/microbenchmarks/test_configs.py index adaf5453..ae61dcba 100644 --- a/gcsfs/tests/perf/microbenchmarks/test_configs.py +++ b/gcsfs/tests/perf/microbenchmarks/test_configs.py @@ -14,6 +14,10 @@ get_listing_benchmark_cases, ) from gcsfs.tests.perf.microbenchmarks.open.configs import get_open_benchmark_cases +from gcsfs.tests.perf.microbenchmarks.put.configs import ( + PutConfigurator, + get_put_benchmark_cases, +) from gcsfs.tests.perf.microbenchmarks.read.configs import ( ReadConfigurator, get_read_benchmark_cases, @@ -157,6 +161,29 @@ def test_write_configurator(mock_config_dependencies): assert case.files == 2 # threads * processes +def test_put_configurator(mock_config_dependencies): + """Test that PutConfigurator correctly builds benchmark parameters.""" + common = { + "bucket_types": ["regional"], + "file_sizes_mb": [200], + "chunk_sizes_mb": [50], + "rounds": 1, + } + scenario = {"name": "put_test", "processes": [2], "threads": [1]} + + configurator = PutConfigurator("dummy") + cases = configurator.build_cases(scenario, common) + + assert len(cases) == 1 + case = cases[0] + assert case.name == "put_test_2procs_1threads_200MB_file_50MB_chunk_regional" + assert case.file_size_bytes == 200 * MB + assert case.chunk_size_bytes == 50 * MB + assert case.processes == 2 + assert case.files == 2 # threads * processes + assert case.bucket_name == "test-bucket" + + def test_listing_configurator(mock_config_dependencies): """Test that ListingConfigurator correctly builds benchmark parameters.""" common = {"bucket_types": ["regional"], "rounds": 1} @@ -273,6 +300,10 @@ def test_validate_actual_yaml_configs(): cases = get_open_benchmark_cases() assert len(cases) > 0, "Open config produced no cases" + # Put + cases = get_put_benchmark_cases() + assert len(cases) > 0, "Put config produced no cases" + # Glob cases = get_glob_benchmark_cases() assert len(cases) > 0, "Glob config produced no cases"