Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gcsfs/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ def pytest_ignore_collect(collection_path, config):
"pipe",
"open",
"glob",
"put",
}

# If only --run-benchmarks-infra is passed, ignore the actual benchmark subfolders.
Expand Down
4 changes: 2 additions & 2 deletions gcsfs/tests/perf/microbenchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Introduction

GCSFS microbenchmarks are a suite of performance tests designed to evaluate the efficiency and latency of various Google Cloud Storage file system operations, including read, write, listing, delete, rename, open, and glob.
GCSFS microbenchmarks are a suite of performance tests designed to evaluate the efficiency and latency of various Google Cloud Storage file system operations, including read, write, put, listing, delete, rename, open, and glob.

These benchmarks are built using the `pytest` and `pytest-benchmark` frameworks. Each benchmark test is a parameterized pytest case, where the parameters are dynamically configured at runtime from YAML configuration files. This allows for flexible and extensive testing scenarios without modifying the code.

Expand Down Expand Up @@ -80,7 +80,7 @@ The `run.py` script is the central entry point for executing benchmarks. It hand

| Option | Description | Required |
| :--- | :--- | :--- |
| `--group` | The benchmark group to run (e.g., `read`, `write`, `listing`, `info`, `open`, `glob`). Runs all groups if not specified. | No |
| `--group` | The benchmark group to run (e.g., `read`, `write`, `put`, `listing`, `info`, `open`, `glob`). Runs all groups if not specified. | No |
| `--config` | Specific scenario names to run (e.g., `read_seq`, `list_flat`). Accepts multiple values. | No |
| `--regional-bucket` | Name of the regional GCS bucket. | Yes* |
| `--zonal-bucket` | Name of the zonal GCS bucket. | Yes* |
Expand Down
105 changes: 99 additions & 6 deletions gcsfs/tests/perf/microbenchmarks/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import multiprocessing
import os
import shutil
import statistics
import tempfile
import time
import uuid
from typing import Any, List
Expand Down Expand Up @@ -35,14 +37,29 @@ def populate_bucket():
return False


def _random_chunks(total_size, max_chunk=100 * MB):
"""Yield byte chunks summing to ``total_size``, each at most ``max_chunk``.

A single 1 MiB random block is generated once and repeated to fill each
chunk. ``os.urandom`` is CPU-bound (~200 MB/s) and otherwise dominates the
setup time for multi-GB files; repeating a block is several times faster
and still produces uncompressible bytes for the network/storage layer (GCS
does not compress object uploads), so measured throughput is unaffected.
"""
block = os.urandom(min(1 * MB, total_size))
block_len = len(block)
remaining = total_size
while remaining > 0:
write_size = min(max_chunk, remaining)
repeats, remainder = divmod(write_size, block_len)
yield block * repeats + block[:remainder]
remaining -= write_size
Comment thread
zhixiangli marked this conversation as resolved.


def _write_file(gcs, path, file_size, chunk_size):
chunks_to_write = file_size // chunk_size
remainder = file_size % chunk_size
with gcs.open(path, "wb", finalize_on_close=True) as f:
for _ in range(chunks_to_write):
f.write(os.urandom(chunk_size))
if remainder > 0:
f.write(os.urandom(remainder))
for chunk in _random_chunks(file_size, chunk_size):
f.write(chunk)

actual_size = gcs.info(path)["size"]
if actual_size != file_size:
Expand Down Expand Up @@ -86,6 +103,13 @@ def _prepare_folders(gcs, folder_paths):
gcs.mkdir(path, create_parents=True)


def _write_local_file(path, file_size):
"""Create a local source file of the given size for put benchmarks."""
with open(path, "wb") as f:
for chunk in _random_chunks(file_size):
f.write(chunk)


def _benchmark_io_fixture_helper(
extended_gcs_factory, params, prefix_tag, create_files=False, gcs_kwargs=None
):
Expand Down Expand Up @@ -184,6 +208,75 @@ def gcsfs_benchmark_pipe(extended_gcs_factory, request):
)


def _benchmark_put_fixture_helper(extended_gcs_factory, params, prefix_tag):
gcs = extended_gcs_factory()

prefix = f"{params.bucket_name}/{prefix_tag}-{uuid.uuid4()}"
file_paths = [f"{prefix}/file_{i}" for i in range(params.files)]

local_dir = tempfile.mkdtemp(prefix="gcsfs-benchmark-put-")
local_path = os.path.join(local_dir, "source")

logging.info(
f"Setting up benchmark '{params.name}': creating local source file of "
f"size {params.file_size_bytes / MB:.2f} MB at '{local_path}' and "
f"targeting {params.files} remote destination(s)."
)

try:
start_time = time.perf_counter()
_write_local_file(local_path, params.file_size_bytes)
duration_ms = (time.perf_counter() - start_time) * 1000
logging.info(
f"Benchmark '{params.name}' setup created local source file in {duration_ms:.2f} ms."
)

# NOTE: The source file is written immediately before the benchmark and
# is shared by every process/round, so it stays resident in the OS page
# cache. These benchmarks therefore measure upload throughput from a
# cached source (representative of "upload a file you just produced"),
# not from cold disk.
yield gcs, local_path, file_paths, params

# Verify upload integrity outside the timed region. gcsfs defaults to
# consistency="none" (no client-side checksum on put), so we assert the
# uploaded object sizes match the local source. The cache is invalidated
# first because multi-process uploads run on separate gcs instances.
gcs.invalidate_cache()
for path in file_paths:
actual_size = gcs.info(path)["size"]
if actual_size != params.file_size_bytes:
raise RuntimeError(
f"Upload integrity check failed for {path}. "
f"Expected size: {params.file_size_bytes}, "
f"Actual size: {actual_size}"
)

finally:
# --- Teardown ---
logging.info(
f"Tearing down benchmark '{params.name}': deleting remote files and local source."
)
try:
gcs.rm(prefix, recursive=True)
except Exception as e:
logging.error(f"Failed to clean up benchmark files: {e!r}")
shutil.rmtree(local_dir, ignore_errors=True)


@pytest.fixture
def gcsfs_benchmark_put(extended_gcs_factory, request):
"""
A fixture that sets up the environment for a put benchmark run.
It provides a GCSFS instance, a single local source file to upload, and a
list of remote destination paths to upload it to.
"""
params = request.param
yield from _benchmark_put_fixture_helper(
extended_gcs_factory, params, "benchmark-put"
)


def _benchmark_listing_fixture_helper(
extended_gcs_factory,
params,
Expand Down
55 changes: 55 additions & 0 deletions gcsfs/tests/perf/microbenchmarks/put/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import itertools

from gcsfs.tests.perf.microbenchmarks.configs import BaseBenchmarkConfigurator
from gcsfs.tests.perf.microbenchmarks.conftest import MB

from .parameters import PutBenchmarkParameters


class PutConfigurator(BaseBenchmarkConfigurator):
def build_cases(self, scenario, common_config):
procs_list = scenario.get("processes", [1])
threads_list = scenario.get("threads", [1])
bucket_types = common_config.get("bucket_types", ["regional"])
file_sizes_mb = common_config.get("file_sizes_mb", [200])
chunk_sizes_mb = common_config.get("chunk_sizes_mb", [50])
rounds = common_config.get("rounds", 1)

cases = []
param_combinations = itertools.product(
procs_list, threads_list, file_sizes_mb, chunk_sizes_mb, bucket_types
)

for (
procs,
threads,
file_size_mb,
chunk_size_mb,
bucket_type,
) in param_combinations:
bucket_name = self.get_bucket_name(bucket_type)
if not bucket_name:
continue

name = (
f"{scenario['name']}_{procs}procs_{threads}threads_"
f"{file_size_mb}MB_file_{chunk_size_mb}MB_chunk_{bucket_type}"
)

params = PutBenchmarkParameters(
name=name,
bucket_name=bucket_name,
bucket_type=bucket_type,
threads=threads,
processes=procs,
files=threads * procs,
rounds=rounds,
file_size_bytes=int(file_size_mb * MB),
chunk_size_bytes=int(chunk_size_mb * MB),
)
cases.append(params)
return cases


def get_put_benchmark_cases():
return PutConfigurator(__file__).generate_cases()
15 changes: 15 additions & 0 deletions gcsfs/tests/perf/microbenchmarks/put/configs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
common:
bucket_types:
- "regional"
- "zonal"
file_sizes_mb:
- 4096 # 4 GB
Comment thread
zhixiangli marked this conversation as resolved.
chunk_sizes_mb:
- 50 # 50 MB Default chunk size
rounds: 3

scenarios:
- name: "put_file"

- name: "put_file_multi_process"
processes: [4, 8]
17 changes: 17 additions & 0 deletions gcsfs/tests/perf/microbenchmarks/put/parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dataclasses import dataclass

from gcsfs.tests.perf.microbenchmarks.parameters import IOBenchmarkParameters


@dataclass
class PutBenchmarkParameters(IOBenchmarkParameters):
"""
Defines the parameters for a put benchmark test case.

A put benchmark uploads a local file from disk to GCS, so the relevant
knobs (``file_size_bytes`` for the local source size and
``chunk_size_bytes`` for the resumable upload chunk size) are already
provided by ``IOBenchmarkParameters``.
"""

pass
117 changes: 117 additions & 0 deletions gcsfs/tests/perf/microbenchmarks/put/test_put.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor

import pytest

from gcsfs.tests.perf.microbenchmarks.put.configs import get_put_benchmark_cases
from gcsfs.tests.perf.microbenchmarks.runner import (
filter_test_cases,
run_multi_process,
run_single_threaded,
)

BENCHMARK_GROUP = "put"


def _put_op(gcs, local_path, remote_path, chunk_size):
"""Upload a local file to a single remote path."""
try:
gcs.put(local_path, remote_path, chunksize=chunk_size)
Comment thread
zhixiangli marked this conversation as resolved.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass blocksize?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put routes through _put_file, which takes chunksize (used directly as the resumable-upload part size). block_size is only a parameter of the open()/GCSFile path, so passing it to put would land in **kwargs and be ignored.

except Exception as e:
logging.error(f"Error putting {local_path} to {remote_path}: {e}")
raise


all_benchmark_cases = get_put_benchmark_cases()
single_threaded_cases, _, multi_process_cases = filter_test_cases(all_benchmark_cases)


@pytest.mark.parametrize(
"gcsfs_benchmark_put",
single_threaded_cases,
indirect=True,
ids=lambda p: p.name,
)
def test_put_single_threaded(benchmark, gcsfs_benchmark_put, monitor):
gcs, local_path, file_paths, params = gcsfs_benchmark_put

op_args = (
gcs,
local_path,
file_paths[0],
params.chunk_size_bytes,
)
run_single_threaded(
benchmark,
monitor,
params,
_put_op,
op_args,
BENCHMARK_GROUP,
)


def _process_worker(
gcs,
local_path,
file_paths,
chunk_size,
process_durations_shared,
index,
):
"""A worker function for each process to upload files concurrently."""
start_time = time.perf_counter()

with ThreadPoolExecutor(max_workers=len(file_paths)) as executor:
futures = [
executor.submit(
_put_op,
gcs,
local_path,
remote_path,
chunk_size,
)
for remote_path in file_paths
]
[f.result() for f in futures]

duration_s = time.perf_counter() - start_time
process_durations_shared[index] = duration_s


@pytest.mark.parametrize(
"gcsfs_benchmark_put",
multi_process_cases,
indirect=True,
ids=lambda p: p.name,
)
def test_put_multi_process(
benchmark, gcsfs_benchmark_put, extended_gcs_factory, request, monitor
):
_, local_path, file_paths, params = gcsfs_benchmark_put
files_per_process = params.files // params.processes

def args_builder(gcs_instance, i, shared_arr):
start_index = i * files_per_process
end_index = start_index + files_per_process
process_files = file_paths[start_index:end_index]
return (
gcs_instance,
local_path,
process_files,
params.chunk_size_bytes,
shared_arr,
i,
)

run_multi_process(
benchmark,
monitor,
params,
extended_gcs_factory,
worker_target=_process_worker,
args_builder=args_builder,
benchmark_group=BENCHMARK_GROUP,
request=request,
)
Loading
Loading