Thanks to @rsignell-usgs's script, I've been playing around with netCDF->Zarr conversion on S3. I am wondering whether there's any throughput data that I can use to make sense of the following measurements I recorded? Or if someone has played with transferring Zarr to S3/GCP in the past, I'd like to know more about this and/or best practices for this kind of task. How to tune Dask cluster to maximize the throughput, etc?
Dask configuration
- 1 worker
- 72 threads per worker
| Data size in (GB) |
Chunk size |
Transfer time (s) |
Throughput (Mb/s) |
| 5.1 |
(1, 1032, 289, 288) |
285.2 |
146 |
| 5.1 |
(1, 516, 289, 288) |
309.3 |
135 |
| 5.1 |
(1, 258, 289, 288) |
350.7 |
119 |
| 5.1 |
(1, 129, 289, 288) |
439.0 |
95 |
Dask configuration
- 2 workers on the same machine
- 72 threads per worker
| Data size in (GB) |
Chunk size |
Transfer time (s) |
Throughput (Mb/s) |
| 5.1 |
(1, 1032, 289, 288) |
16 |
2611 |
| 5.1 |
(1, 516, 289, 288) |
18 |
2321 |
| 5.1 |
(1, 258, 289, 288) |
28 |
1492 |
| 5.1 |
(1, 129, 289, 288) |
47 |
889 |
Here's my script:
import xarray as xr
from pathlib import Path
from dask.distributed import Client
import s3fs
import time
if __name__ == '__main__':
client = Client(processes=False, n_workers=1, threads_per_worker=72)
print(client)
root_dir = Path("/glade/p_old/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS")
CASE = 'b.e11.B20TRC5CNBDRD.f09_g16'
list_1 = sorted(root_dir.glob("b.e11.B20TRC5CNBDRD.f09_g16.???.cam.h0.*"))
# indices of special runs to remove from the original list.
# These runs' outputs have additional variables, and/or have special time ranges
indices = 0, 33, 34
updated_list = [item for index, item in enumerate(list_1) if index not in indices]
dset = xr.open_mfdataset(updated_list, concat_dim='ensemble')
dset = dset.chunk({'ensemble': 1, 'time': 516})
# Output: S3 Bucket
f_zarr = f'zarr-test-bucket/test1/lens/{CASE}'
# write data using xarray.to_zarr()
fs = s3fs.S3FileSystem(anon=False)
d = s3fs.S3Map(f_zarr, s3=fs)
start = time.clock()
dset.to_zarr(store=d, mode='w')
print(f'Time taken = {time.clock()-start}')
Thanks to @rsignell-usgs's script, I've been playing around with netCDF->Zarr conversion on S3. I am wondering whether there's any throughput data that I can use to make sense of the following measurements I recorded? Or if someone has played with transferring Zarr to S3/GCP in the past, I'd like to know more about this and/or best practices for this kind of task. How to tune Dask cluster to maximize the throughput, etc?
Dask configuration
Dask configuration
Here's my script: