-
Notifications
You must be signed in to change notification settings - Fork 176
test(perf): add microbenchmarks for put operations #939
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| import itertools | ||
|
|
||
| from gcsfs.tests.perf.microbenchmarks.configs import BaseBenchmarkConfigurator | ||
| from gcsfs.tests.perf.microbenchmarks.conftest import MB | ||
|
|
||
| from .parameters import PutBenchmarkParameters | ||
|
|
||
|
|
||
| class PutConfigurator(BaseBenchmarkConfigurator): | ||
| def build_cases(self, scenario, common_config): | ||
| procs_list = scenario.get("processes", [1]) | ||
| threads_list = scenario.get("threads", [1]) | ||
| bucket_types = common_config.get("bucket_types", ["regional"]) | ||
| file_sizes_mb = common_config.get("file_sizes_mb", [200]) | ||
| chunk_sizes_mb = common_config.get("chunk_sizes_mb", [50]) | ||
| rounds = common_config.get("rounds", 1) | ||
|
|
||
| cases = [] | ||
| param_combinations = itertools.product( | ||
| procs_list, threads_list, file_sizes_mb, chunk_sizes_mb, bucket_types | ||
| ) | ||
|
|
||
| for ( | ||
| procs, | ||
| threads, | ||
| file_size_mb, | ||
| chunk_size_mb, | ||
| bucket_type, | ||
| ) in param_combinations: | ||
| bucket_name = self.get_bucket_name(bucket_type) | ||
| if not bucket_name: | ||
| continue | ||
|
|
||
| name = ( | ||
| f"{scenario['name']}_{procs}procs_{threads}threads_" | ||
| f"{file_size_mb}MB_file_{chunk_size_mb}MB_chunk_{bucket_type}" | ||
| ) | ||
|
|
||
| params = PutBenchmarkParameters( | ||
| name=name, | ||
| bucket_name=bucket_name, | ||
| bucket_type=bucket_type, | ||
| threads=threads, | ||
| processes=procs, | ||
| files=threads * procs, | ||
| rounds=rounds, | ||
| file_size_bytes=int(file_size_mb * MB), | ||
| chunk_size_bytes=int(chunk_size_mb * MB), | ||
| ) | ||
| cases.append(params) | ||
| return cases | ||
|
|
||
|
|
||
| def get_put_benchmark_cases(): | ||
| return PutConfigurator(__file__).generate_cases() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| common: | ||
| bucket_types: | ||
| - "regional" | ||
| - "zonal" | ||
| file_sizes_mb: | ||
| - 4096 # 4 GB | ||
|
zhixiangli marked this conversation as resolved.
|
||
| chunk_sizes_mb: | ||
| - 50 # 50 MB Default chunk size | ||
| rounds: 3 | ||
|
|
||
| scenarios: | ||
| - name: "put_file" | ||
|
|
||
| - name: "put_file_multi_process" | ||
| processes: [4, 8] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| from dataclasses import dataclass | ||
|
|
||
| from gcsfs.tests.perf.microbenchmarks.parameters import IOBenchmarkParameters | ||
|
|
||
|
|
||
| @dataclass | ||
| class PutBenchmarkParameters(IOBenchmarkParameters): | ||
| """ | ||
| Defines the parameters for a put benchmark test case. | ||
|
|
||
| A put benchmark uploads a local file from disk to GCS, so the relevant | ||
| knobs (``file_size_bytes`` for the local source size and | ||
| ``chunk_size_bytes`` for the resumable upload chunk size) are already | ||
| provided by ``IOBenchmarkParameters``. | ||
| """ | ||
|
|
||
| pass |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| import logging | ||
| import time | ||
| from concurrent.futures import ThreadPoolExecutor | ||
|
|
||
| import pytest | ||
|
|
||
| from gcsfs.tests.perf.microbenchmarks.put.configs import get_put_benchmark_cases | ||
| from gcsfs.tests.perf.microbenchmarks.runner import ( | ||
| filter_test_cases, | ||
| run_multi_process, | ||
| run_single_threaded, | ||
| ) | ||
|
|
||
| BENCHMARK_GROUP = "put" | ||
|
|
||
|
|
||
| def _put_op(gcs, local_path, remote_path, chunk_size): | ||
| """Upload a local file to a single remote path.""" | ||
| try: | ||
| gcs.put(local_path, remote_path, chunksize=chunk_size) | ||
|
zhixiangli marked this conversation as resolved.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we pass blocksize?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put routes through _put_file, which takes chunksize (used directly as the resumable-upload part size). block_size is only a parameter of the open()/GCSFile path, so passing it to put would land in **kwargs and be ignored. |
||
| except Exception as e: | ||
| logging.error(f"Error putting {local_path} to {remote_path}: {e}") | ||
| raise | ||
|
|
||
|
|
||
| all_benchmark_cases = get_put_benchmark_cases() | ||
| single_threaded_cases, _, multi_process_cases = filter_test_cases(all_benchmark_cases) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "gcsfs_benchmark_put", | ||
| single_threaded_cases, | ||
| indirect=True, | ||
| ids=lambda p: p.name, | ||
| ) | ||
| def test_put_single_threaded(benchmark, gcsfs_benchmark_put, monitor): | ||
| gcs, local_path, file_paths, params = gcsfs_benchmark_put | ||
|
|
||
| op_args = ( | ||
| gcs, | ||
| local_path, | ||
| file_paths[0], | ||
| params.chunk_size_bytes, | ||
| ) | ||
| run_single_threaded( | ||
| benchmark, | ||
| monitor, | ||
| params, | ||
| _put_op, | ||
| op_args, | ||
| BENCHMARK_GROUP, | ||
| ) | ||
|
|
||
|
|
||
| def _process_worker( | ||
| gcs, | ||
| local_path, | ||
| file_paths, | ||
| chunk_size, | ||
| process_durations_shared, | ||
| index, | ||
| ): | ||
| """A worker function for each process to upload files concurrently.""" | ||
| start_time = time.perf_counter() | ||
|
|
||
| with ThreadPoolExecutor(max_workers=len(file_paths)) as executor: | ||
| futures = [ | ||
| executor.submit( | ||
| _put_op, | ||
| gcs, | ||
| local_path, | ||
| remote_path, | ||
| chunk_size, | ||
| ) | ||
| for remote_path in file_paths | ||
| ] | ||
| [f.result() for f in futures] | ||
|
|
||
| duration_s = time.perf_counter() - start_time | ||
| process_durations_shared[index] = duration_s | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "gcsfs_benchmark_put", | ||
| multi_process_cases, | ||
| indirect=True, | ||
| ids=lambda p: p.name, | ||
| ) | ||
| def test_put_multi_process( | ||
| benchmark, gcsfs_benchmark_put, extended_gcs_factory, request, monitor | ||
| ): | ||
| _, local_path, file_paths, params = gcsfs_benchmark_put | ||
| files_per_process = params.files // params.processes | ||
|
|
||
| def args_builder(gcs_instance, i, shared_arr): | ||
| start_index = i * files_per_process | ||
| end_index = start_index + files_per_process | ||
| process_files = file_paths[start_index:end_index] | ||
| return ( | ||
| gcs_instance, | ||
| local_path, | ||
| process_files, | ||
| params.chunk_size_bytes, | ||
| shared_arr, | ||
| i, | ||
| ) | ||
|
|
||
| run_multi_process( | ||
| benchmark, | ||
| monitor, | ||
| params, | ||
| extended_gcs_factory, | ||
| worker_target=_process_worker, | ||
| args_builder=args_builder, | ||
| benchmark_group=BENCHMARK_GROUP, | ||
| request=request, | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.