Skip to content

SAFE to Zarr notebook#186

Open
clausmichele wants to merge 1 commit into
mainfrom
convert_SAFE_Zarr
Open

SAFE to Zarr notebook#186
clausmichele wants to merge 1 commit into
mainfrom
convert_SAFE_Zarr

Conversation

@clausmichele
Copy link
Copy Markdown
Member

@senmao I'm trying to run the notebook in this PR to convert a sample product in JupyterLab. I tried with a Dask local cluster and with Dask Gateway without success. Could you help me please?

@clausmichele clausmichele requested a review from senmao November 5, 2025 07:58
@review-notebook-app
Copy link
Copy Markdown

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@clausmichele
Copy link
Copy Markdown
Member Author

@rtmiz maybe you could also try to run the notebook and check if you spot the issue

@senmao
Copy link
Copy Markdown

senmao commented Nov 7, 2025

@clausmichele It is not clear for me from the error message why the conversion is failed. It looks like the dask task was cancelled. The required memory for CPM 2.6.2 is usually more than 16 GB, sometimes even larger. Maybe you can increase the RAM and try it again.

In general, the calling of eopf convert function is fine. Other important settings are inside eopf.toml file, especially the dask context part. Below is the dask context from my setting.

[dask_context]
cluster_type = "local"
[dask_context.cluster_config]
n_workers = 1
processes = false

@clausmichele
Copy link
Copy Markdown
Member Author

@senmao I can't increase the local cluster memory, it's the one provided by EODC JupyterLab and it's limited to 10GB:
2025-11-11 15:19:31,447 - distributed.nanny.memory - WARNING - Ignoring provided memory limit 64GB due to system memory limit of 10.00 GiB

Moreover, I also can't use the Dask Gateway because of environment incompatibilities, see: EOPF-Sample-Service/eopf-container-images#35

@clausmichele
Copy link
Copy Markdown
Member Author

Sear @senmao, thanks to @Yuvraj198920 we have now a docker image with eopf that can be used within Jupyter, which is named yuvraj1989/eopf-jupyterhub-dask:latest.

I'm able to use it for starting a Jupyter session from https://jupyterhub.user.eopf.eodc.eu/hub/home

I'm also using it to configure the Dask Gateway when calling the convert function is this way:

dask_config = {
"cluster_type": "gateway",
"proxy_address": "tls://dask.user.eopf.eodc.eu:10000",
    "cluster_config": {
        "address": "https://dask.user.eopf.eodc.eu",
        "auth": {
           "type": "jupyterhub",
        },
        "image": "yuvraj1989/eopf-jupyterhub-dask:latest",
        "worker_memory": 8,
        "n_workers" : 16
   },
    "client_config": {
       "timeout" : "5000s"
    }
}

import os
from eopf.store.convert import convert
from eopf.common.constants import OpeningMode
from eopf.dask_utils.dask_context_utils import remote_dask_cluster_decorator

path_to_safe  = os.path.join(os.getcwd(), 'Sentinel-2/MSI/L2A/2024/01/15/S2A_MSIL2A_20240115T235221_N0510_R130_T55HGS_20240116T021554.SAFE')
path_to_zarr = os.path.join(os.getcwd(), 'S2A_MSIL2A_20240115T235221_N0510_R130_T55HGS_20240116T021554.zarr')

@remote_dask_cluster_decorator(dask_config)
def convert_to_native_python_type():
    # Add this parameter if you want to overwrite the output of the conversion if it already exists
    target_store_config = dict(mode=OpeningMode.CREATE_OVERWRITE)
    
    # Apply conversion
    convert(path_to_safe, path_to_zarr, target_store_kwargs=target_store_config)
convert_to_native_python_type()

Despite the really long timeout I've set, it times out after more than an hour, without writing anything to disk. Could you help me please to identify the issue?

@senmao
Copy link
Copy Markdown

senmao commented Jan 14, 2026

@clausmichele after some investigation, the issue (timeout) you encountered could be because proxy_address for dask_gateway is not correctly handled inside EOPF package. After hacking a little bit on the EOPF DaskContext object, replacing the default dask cluster with a new one, the processing starts to run but fail due to a dask computing error: DaskComputingError: Error occurred during dask computation on { ...... } .

However, this is a different error which need to be further investigated. Below is the script which I tried to replace the eopf default dask cluster:

import os
import logging
from dask_gateway import Gateway
from eopf.logging import EOLogging
from eopf.store.convert import convert
from eopf.common.constants import OpeningMode
from eopf.dask_utils import ClusterType, DaskContext

dask_config = {
    "cluster_type": "gateway",
    "proxy_address": "tls://dask.user.eopf.eodc.eu:10000",
    "cluster_config": {
        "address": "https://dask.user.eopf.eodc.eu",
        "auth": {
           "type": "jupyterhub",
        },
        "image": "yuvraj1989/eopf-jupyterhub-dask:latest",
        "worker_memory": 8,
        "n_workers" : 2,
   },
    "client_config": {
       "timeout" : "600s"
    }
}


worker_memory = dask_config["cluster_config"]["worker_memory"]
worker_num = dask_config["cluster_config"]["n_workers"]

path_to_safe  = os.path.join(os.getcwd(), 'Sentinel-2/MSI/L2A/2024/01/15/S2A_MSIL2A_20240115T235221_N0510_R130_T55HGS_20240116T021554.SAFE')
path_to_zarr = os.path.join(os.getcwd(), 'S2A_MSIL2A_20240115T235221_N0510_R130_T55HGS_20240116T021554.zarr')

dc = DaskContext(
    cluster_type=ClusterType.GATEWAY,
    cluster_config=dask_config["cluster_config"],
    client_config=dask_config["client_config"],
) 

print (f"EOPF Default Dask cluster: {dc._cluster.name}")


## shutdown eopf cluster and create own dask cluster
gateway = Gateway(
    address="https://dask.user.eopf.eodc.eu",
    proxy_address="tls://dask.user.eopf.eodc.eu:10000",
    auth = "jupyterhub",
)
clusters = gateway.list_clusters() 
if clusters:
    print ("=> Available clusters: ")
    for clst in clusters:
        print ("shutdwown:", clst.name)
        gateway.stop_cluster(clst.name)
else:
    print ("No cluster available!")
cluster = gateway.new_cluster(
    image="yuvraj1989/eopf-jupyterhub-dask:latest",
    worker_memory=worker_memory,
)
cluster.scale(worker_num)
print (f"newly created cluster: {cluster.name}")
dc._cluster = cluster

print (f"EOPF NEW Dask cluster: {dc._cluster.name}")


#if True:
with dc:
    print("gateway proxy address: ", gateway.proxy_address)
    print("client scheduler info: ", dc.client.scheduler_info())

    print (f"cluster: {dc.cluster}")
    print (f"client: {dc.client}")
    
    target_store_config = dict(mode=OpeningMode.CREATE_OVERWRITE)
    convert(path_to_safe, path_to_zarr, target_store_kwargs=target_store_config)

@clausmichele
Copy link
Copy Markdown
Member Author

@senmao could you check my last reply here? https://gitlab.eopf.copernicus.eu/cpm/eopf-cpm/-/issues/1006#note_66277

This is still pending and it would be nice to find finally working a solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants