From 93e95f7ac12c2c912fe4ada9aa6cd703363c4ae2 Mon Sep 17 00:00:00 2001 From: Stefano Piani Date: Mon, 9 Jun 2025 11:34:53 +0200 Subject: [PATCH 1/2] Moved some functions into array_utils file --- src/ogs_riverger/efas/efas_manager.py | 138 ++------------------------ src/ogs_riverger/utils/array_utils.py | 132 ++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 132 deletions(-) create mode 100644 src/ogs_riverger/utils/array_utils.py diff --git a/src/ogs_riverger/efas/efas_manager.py b/src/ogs_riverger/efas/efas_manager.py index 53f7ed9..c647d76 100644 --- a/src/ogs_riverger/efas/efas_manager.py +++ b/src/ogs_riverger/efas/efas_manager.py @@ -20,140 +20,14 @@ import ogs_riverger.efas.download_tools as efas_download from ogs_riverger.efas.download_tools import EfasDataSource from ogs_riverger.read_config import RiverConfigElement +from ogs_riverger.utils.array_utils import find_slice +from ogs_riverger.utils.array_utils import get_best_unique_element class InvalidEfasFile(Exception): pass -def _get_best_unique_element( - v: np.ndarray, weights: np.ndarray -) -> tuple[np.ndarray, np.ndarray]: - """Given an array `v`` that may contain repetitions of the same elements, - returns a tuple of two arrays. The first one is a sorted array where every - element of `v` appears just one, the second one is an array of indices - `index` such that `v[index]` is equal to the first array we return. An - index `i` of `index` is selected such that `weights[i] >= weights[j]` for - every other `j` such that `v[i] == v[j]`. - - This function is similar to `np.unique(return_index=True)` and behaves in - the same way if `weights == np.arange(len(v))[::-1]` (or any other - array in descending order). - """ - logger = logging.getLogger(f"{__name__}.{inspect.stack()[0][3]}") - - # Here we sort the v and the weights accordingly - sorting_indices = np.argsort(v) - weights = weights[sorting_indices] - v = v[sorting_indices] - - # Now we check the position of each element of v; indeed, - # unique_weights[i] contains the index of the first occurrence of the value - # unique_v[i] inside the vector `v`; since valid_time is sorted, - # from `unique_weights[i]` to `unique_weights[i + 1]` there are all the - # values with a specific value. There we have to choose the one with - # the maximum weight - unique_v, unique_weights = np.unique(v, return_index=True) - logger.debug( - "There are %s unique values; from %s to %s", - len(unique_v), - unique_v[0], - unique_v[-1], - ) - - # Here we save the indices of the elements that we select - corresponding_indices = np.empty_like(unique_v, dtype=np.int32) - - # We convert unique_weights to a list because otherwise the static - # typechecker of Pycharm wrongly reports an error - unique_weights: list[int] = list(unique_weights) - for i, i1 in enumerate(unique_weights): - i2 = len(v) if i == len(unique_weights) - 1 else unique_weights[i + 1] - - logger.debug( - "There are %s repetition of the element %s", - i2 - i1, - unique_v[i], - ) - # From i1 to i2 we have the same value; now we choose - # the index of the element with the maximum weight. This means - # that weight[index_when_sorted] is the maximum weight for the value - # we are considering. We also have to take into account that this - # index is valid only after we have sorted the arrays. To retrieve the - # original ones, we must use the "sorting_indices" array to go back - # to the original position - best_value_local_index = int(np.argmax(weights[i1:i2])) - index_when_sorted = i1 + best_value_local_index - - corresponding_indices[i] = sorting_indices[index_when_sorted] - - return unique_v, corresponding_indices - - -def _find_slice(original: np.ndarray, sliced: np.ndarray) -> tuple[int, int]: - """Finds the start and end indices of a slice obtained from a 1D array. - - This function takes an original 1D array `v` in ascending or descending - order and another array `w` that has been obtained by performing a slice - of `v` (i.e., `w = v[i:j]`). It calculates and returns the two indices - `i` and `j` that have been used to get the slice `w` from the original - array `v`. - - Args: - original (np.ndarray): The original 1D array in ascending or descending - order. - sliced (np.ndarray): The sliced array obtained from the original array - (w = v[i:j]). - - Returns: - tuple[int, int]: A tuple containing the start index `i` and end - index `j` used for the slice. - - Raises: - ValueError: If the sliced array is not a valid slice of the original - array. - - Example: - >>> v = np.array([1, 2, 3, 4, 5]) - >>> w = v[1:4] - >>> _find_slice(v, w) - (1, 4) - """ - if len(sliced) > len(original): - raise ValueError( - f"The size of the sliced array ({len(sliced)}) is greater than " - f"the size of the original one ({len(original)})" - ) - - # We consider the first entry of sliced, and we look for the element of - # `original` that is closest - i = np.argmin(np.abs(original - sliced[0])) - - # Convert `i` to a standard integer (from a np.int32 or np.int64) - i = int(i) - - # In this case, the original array ends before we found a corresponding - # element for each value of the slice - if i > len(original) - len(sliced): - raise ValueError( - f"The sliced array is not a valid slice of the original array. " - f"The first element of the sliced array is {sliced[0]} but it " - f"has position {i} of {len(original)} in the original array. " - f"Since sliced has length {len(sliced)}, it can not be a valid " - "slice of the original one." - ) - - j = i + sliced.size - - # Check if sliced is a valid slice of original - if not np.allclose(original[i:j], sliced, atol=0.5e-3): - raise ValueError( - "The sliced array is not a valid slice of the original array." - ) - - return i, j - - def generate_efas_domain_file(output_file: PathLike) -> Path: """Generate a single NetCDF4 file containing the coordinates of the entire EFAS domain. @@ -263,7 +137,7 @@ def read_efas_data_files( # Here we select only the most reliable forecast (the one that is closest # to the computation date) logger.debug("Getting the best forecast (if needed)") - _, slicing_indices = _get_best_unique_element( + _, slicing_indices = get_best_unique_element( concat_dataset.time.values, concat_dataset.computation_date.values ) @@ -491,8 +365,8 @@ def _read_raw_content_of_unzipped_efas_file( dataset_latitudes = single_ds.latitude.values dataset_longitudes = single_ds.longitude.values - i_lat1, i_lat2 = _find_slice(domain_latitudes, dataset_latitudes) - i_lon1, i_lon2 = _find_slice(domain_longitudes, dataset_longitudes) + i_lat1, i_lat2 = find_slice(domain_latitudes, dataset_latitudes) + i_lon1, i_lon2 = find_slice(domain_longitudes, dataset_longitudes) # Check that the position of the rivers is coherent with the # domain of the file we have downloaded @@ -648,7 +522,7 @@ def _read_efas_forecast_file(dataset: xr.Dataset): time = time.ravel() valid_time = valid_time.ravel() - _, corresponding_indices = _get_best_unique_element(valid_time, time) + _, corresponding_indices = get_best_unique_element(valid_time, time) logger.debug("Association completed! Now we perform the slicing...") # Here instead we unravel the indices, obtaining the correct indices diff --git a/src/ogs_riverger/utils/array_utils.py b/src/ogs_riverger/utils/array_utils.py new file mode 100644 index 0000000..739c8f5 --- /dev/null +++ b/src/ogs_riverger/utils/array_utils.py @@ -0,0 +1,132 @@ +import inspect +import logging + +import numpy as np + + +def get_best_unique_element( + v: np.ndarray, weights: np.ndarray +) -> tuple[np.ndarray, np.ndarray]: + """Given an array `v`` that may contain repetitions of the same elements, + returns a tuple of two arrays. The first one is a sorted array where every + element of `v` appears just one, the second one is an array of indices + `index` such that `v[index]` is equal to the first array we return. An + index `i` of `index` is selected such that `weights[i] >= weights[j]` for + every other `j` such that `v[i] == v[j]`. + + This function is similar to `np.unique(return_index=True)` and behaves in + the same way if `weights == np.arange(len(v))[::-1]` (or any other + array in descending order). + """ + logger = logging.getLogger(f"{__name__}.{inspect.stack()[0][3]}") + + # Here we sort the v and the weights accordingly + sorting_indices = np.argsort(v) + weights = weights[sorting_indices] + v = v[sorting_indices] + + # Now we check the position of each element of v; indeed, + # unique_weights[i] contains the index of the first occurrence of the value + # unique_v[i] inside the vector `v`; since valid_time is sorted, + # from `unique_weights[i]` to `unique_weights[i + 1]` there are all the + # values with a specific value. There we have to choose the one with + # the maximum weight + unique_v, unique_weights = np.unique(v, return_index=True) + logger.debug( + "There are %s unique values; from %s to %s", + len(unique_v), + unique_v[0], + unique_v[-1], + ) + + # Here we save the indices of the elements that we select + corresponding_indices = np.empty_like(unique_v, dtype=np.int32) + + # We convert unique_weights to a list because otherwise the static + # typechecker of Pycharm wrongly reports an error + unique_weights: list[int] = list(unique_weights) + for i, i1 in enumerate(unique_weights): + i2 = len(v) if i == len(unique_weights) - 1 else unique_weights[i + 1] + + logger.debug( + "There are %s repetition of the element %s", + i2 - i1, + unique_v[i], + ) + # From i1 to i2 we have the same value; now we choose + # the index of the element with the maximum weight. This means + # that weight[index_when_sorted] is the maximum weight for the value + # we are considering. We also have to take into account that this + # index is valid only after we have sorted the arrays. To retrieve the + # original ones, we must use the "sorting_indices" array to go back + # to the original position + best_value_local_index = int(np.argmax(weights[i1:i2])) + index_when_sorted = i1 + best_value_local_index + + corresponding_indices[i] = sorting_indices[index_when_sorted] + + return unique_v, corresponding_indices + + +def find_slice(original: np.ndarray, sliced: np.ndarray) -> tuple[int, int]: + """Finds the start and end indices of a slice obtained from a 1D array. + + This function takes an original 1D array `v` in ascending or descending + order and another array `w` that has been obtained by performing a slice + of `v` (i.e., `w = v[i:j]`). It calculates and returns the two indices + `i` and `j` that have been used to get the slice `w` from the original + array `v`. + + Args: + original (np.ndarray): The original 1D array in ascending or descending + order. + sliced (np.ndarray): The sliced array obtained from the original array + (w = v[i:j]). + + Returns: + tuple[int, int]: A tuple containing the start index `i` and end + index `j` used for the slice. + + Raises: + ValueError: If the sliced array is not a valid slice of the original + array. + + Example: + >>> v = np.array([1, 2, 3, 4, 5]) + >>> w = v[1:4] + >>> _find_slice(v, w) + (1, 4) + """ + if len(sliced) > len(original): + raise ValueError( + f"The size of the sliced array ({len(sliced)}) is greater than " + f"the size of the original one ({len(original)})" + ) + + # We consider the first entry of sliced, and we look for the element of + # `original` that is closest + i = np.argmin(np.abs(original - sliced[0])) + + # Convert `i` to a standard integer (from a np.int32 or np.int64) + i = int(i) + + # In this case, the original array ends before we found a corresponding + # element for each value of the slice + if i > len(original) - len(sliced): + raise ValueError( + f"The sliced array is not a valid slice of the original array. " + f"The first element of the sliced array is {sliced[0]} but it " + f"has position {i} of {len(original)} in the original array. " + f"Since sliced has length {len(sliced)}, it can not be a valid " + "slice of the original one." + ) + + j = i + sliced.size + + # Check if sliced is a valid slice of original + if not np.allclose(original[i:j], sliced, atol=0.5e-3): + raise ValueError( + "The sliced array is not a valid slice of the original array." + ) + + return i, j From d1555a2261bba5570bd2b7beba15b9eccb4151ae Mon Sep 17 00:00:00 2001 From: Stefano Piani Date: Thu, 12 Jun 2025 12:24:08 +0200 Subject: [PATCH 2/2] Adding retries when listing the files of the FTP server --- src/ogs_riverger/efas/download_tools.py | 64 ++++++++++++++++++++----- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/src/ogs_riverger/efas/download_tools.py b/src/ogs_riverger/efas/download_tools.py index 85140a4..83c6013 100644 --- a/src/ogs_riverger/efas/download_tools.py +++ b/src/ogs_riverger/efas/download_tools.py @@ -792,7 +792,7 @@ def _should_be_downloaded( return True, None async def _get_remote_server_available_files( - self, + self, retries: int = 10 ) -> dict[str, tuple[Path, int]]: """Checks which files are available on the server. @@ -800,22 +800,64 @@ async def _get_remote_server_available_files( files stored in this directory. The method returns a dictionary that maps each file name to its corresponding path and size. + Args: + retries: The number of retries to perform. If the method fails, it + will try again this number of times before raising an error. + Returns: A dictionary where keys are file names (str) and values are tuples of two elements such that: - the first element of the tuple is the file's path on the server. - the second element of the tuple is the file's size in bytes. """ - async with aioftp.Client.context( - EFAS_FTP_URL, - port=EFAS_FTP_PORT, - user=self._user, - password=self._password.get_secret_value(), - ) as client: - available_files = { - p.name: (p, int(p_stat["size"])) - for p, p_stat in await client.list(EFAS_FTP_DIR) - } + logger = logging.getLogger(f"{__name__}.{inspect.stack()[0][3]}") + for attempt in range(retries): + try: + logger.debug( + "Connecting to %s to check the available files", + EFAS_FTP_URL, + ) + async with aioftp.Client.context( + EFAS_FTP_URL, + port=EFAS_FTP_PORT, + user=self._user, + password=self._password.get_secret_value(), + socket_timeout=30, + connection_timeout=30, + ) as client: + logger.debug( + "Connection opened; retrieving the list of files" + ) + available_files = { + p.name: (p, int(p_stat["size"])) + for p, p_stat in await client.list(EFAS_FTP_DIR) + } + except Exception as e: + if attempt == retries - 1: + logger.error( + "Failed to retrieve the list of files from the FTP " + "server for %d times; we will not try again. The last " + "exception was: ", + retries, + exc_info=e, + ) + raise + else: + wait_for = attempt * 4 + logger.warning( + "Failed to retrieve the list of files from the FTP " + "server (attempt number %d): retrying again in %d " + "seconds;", + wait_for, + attempt + 1, + exc_info=e, + ) + await asyncio.sleep(wait_for) + + logger.debug( + "Data received; %d files are available on the server", + len(available_files), + ) return available_files async def _single_download(