Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions domains/CentralMediterranean.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
name: CentralMediterranean
geometry:
type: basin
uuid: mid3
268 changes: 248 additions & 20 deletions src/medunda/domains/domain.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
import logging
import re
import tempfile
import zipfile
from abc import ABC
from abc import abstractmethod
from collections import deque
from pathlib import Path
from typing import Literal
from typing import Union
Expand All @@ -24,16 +26,90 @@
LOGGER = logging.getLogger(__name__)


class InvalidBasinUUIDError(ValueError):
pass


class BoundingBox(BaseModel):
"""
Represents a geographical bounding box with optional depth constraints.

This class embodies a rectangular geographical region defined by latitude
and longitude boundaries, optionally including depth constraints.
The objects of this class are used to define the slices used by the
providers to select the data from the remote datasets.

Attributes:
minimum_depth: Optional lower bound for depth constraint.
maximum_depth: Optional upper bound for depth constraint.
minimum_latitude: The southernmost latitude of the bounding box.
maximum_latitude: The northernmost latitude of the bounding box.
minimum_longitude: The westernmost longitude of the bounding box.
maximum_longitude: The easternmost longitude of the bounding box.
"""

minimum_depth: float | None
maximum_depth: float | None
minimum_latitude: float
maximum_latitude: float
minimum_longitude: float
maximum_longitude: float

@classmethod
def merge(cls, *bboxes: "BoundingBox") -> "BoundingBox":
"""Create a new BoundingBox that covers all the given bounding boxes.

This method returns the smallest bounding box that contains all the
bounding boxes provided as inputs
"""
if len(bboxes) == 0:
raise ValueError("No bounding boxes provided")
if len(bboxes) == 1:
return bboxes[0]
minimum_latitude = min(bbox.minimum_latitude for bbox in bboxes)
maximum_latitude = max(bbox.maximum_latitude for bbox in bboxes)
minimum_longitude = min(bbox.minimum_longitude for bbox in bboxes)
maximum_longitude = max(bbox.maximum_longitude for bbox in bboxes)

minimum_depth: float | None = None
maximum_depth: float | None = None

if all(bbox.minimum_depth is not None for bbox in bboxes):
minimum_depth = min(bbox.minimum_depth for bbox in bboxes) # pyright: ignore[reportArgumentType]

if all(bbox.maximum_depth is not None for bbox in bboxes):
maximum_depth = max(bbox.maximum_depth for bbox in bboxes) # pyright: ignore[reportArgumentType]

return BoundingBox(
minimum_depth=minimum_depth,
maximum_depth=maximum_depth,
minimum_latitude=minimum_latitude,
maximum_latitude=maximum_latitude,
minimum_longitude=minimum_longitude,
maximum_longitude=maximum_longitude,
)


class Domain(BaseModel, ABC):
"""
Represents a domain in a computational or data-handling context.

Attributes:
name (str): The name of the domain. Must conform to the naming
convention disallowing special characters.
bounding_box (BoundingBox): The bounding box representing the spatial
boundaries of the domain.

Methods:
name_is_conformal(name: str) -> str:
Validates that the domain name complies with the naming standard by
disallowing special characters. Returns the name if valid.

compute_selection_mask(dataset: xr.Dataset) -> np.ndarray:
Abstract method to compute a selection mask over a dataset.
Must be implemented by subclasses.
"""

name: str
bounding_box: BoundingBox

Expand Down Expand Up @@ -135,18 +211,166 @@ def create_from_shapely_poly(
)


ConcreteDomain = Union[RectangularDomain, PolygonalDomain]
class MultiPolygonalDomain(Domain):
type: Literal["MultiPolygonalDomain"] = "MultiPolygonalDomain"
polygons: list[PolygonalDomain]

def compute_selection_mask(self, dataset: "xr.Dataset") -> "np.ndarray":
latitudes = dataset.latitude.values[:, np.newaxis]
longitudes = dataset.longitude.values

data_shape = np.broadcast_shapes(latitudes.shape, longitudes.shape)
mask = np.zeros(data_shape, dtype=bool)

if len(self.polygons) == 0:
return mask

for polygonal_domain in self.polygons:
poly_mask = polygonal_domain.compute_selection_mask(
dataset=dataset
)
mask = np.logical_or(mask, poly_mask)

return mask

@classmethod
def create_from_polygons(cls, name: str, polygons: list[PolygonalDomain]):
bounding_box = BoundingBox.merge(*[p.bounding_box for p in polygons])
return cls(name=name, bounding_box=bounding_box, polygons=polygons)


ConcreteDomain = Union[
RectangularDomain, PolygonalDomain, MultiPolygonalDomain
]


def _read_path(raw_path: str):
raw_path = raw_path.replace("${MAIN_DIR}", str(MAIN_DIR))
return Path(raw_path)


def domain_from_basin(basin_uuid: str, name: str | None = None):
Comment thread
spiani marked this conversation as resolved.
"""
Generates a domain representation from a given basin identifier.

This function constructs a domain representation from a basin specified by
its UUID. If the basin is a composed entity, it processes its components
recursively to generate the appropriate domain representation. The result
could be either a simple polygonal domain or a multipolygonal domain,
depending on the type of the basin. It ensures that unsupported basin
types are appropriately handled by raising an error.

Parameters:
basin_uuid (str): The unique identifier for the basin to process.
name (str | None, optional): The optional name to assign to the
created domain. If not provided, the name is derived from the
basin UUID.

Returns:
PolygonalDomain | MultiPolygonalDomain: The created domain
representing the given basin. Returns a PolygonalDomain for
simple basins, and a MultiPolygonalDomain for composed basins.

Raises:
InvalidBasinUUIDError: If the provided basin UUID is invalid
ValueError: If processing a composed basin that has no sub-basins.
NotImplementedError: If encountering a basin type that is not
supported (only SimplePolygonalBasins are supported).
"""
LOGGER.debug("Loading basin %s", basin_uuid)
try:
basin = bitsea_basin.Basin.load_from_uuid(basin_uuid)
except Exception as e:
raise InvalidBasinUUIDError(
f"Could not load basin {basin_uuid}"
) from e

if name is None:
name = basin_uuid.split(".")[-1]

to_be_checked = deque([basin])
simple_basins = {}

# If the basin is composed, add its components to the list of simple basins
# that we have to check
while len(to_be_checked) > 0:
current_basin = to_be_checked.pop()
basin_uuid = current_basin.get_uuid()
LOGGER.debug("Checking if basin %s is a composed basin", basin_uuid)
if basin_uuid in simple_basins:
LOGGER.debug(
"Basin %s has already been visited! Skipped", basin_uuid
)
continue
if hasattr(current_basin, "basin_list"):
LOGGER.debug(
"Basin %s is composed! Adding its components to the list of "
"basins that we have to check",
basin_uuid,
)
for basin_component in reversed(current_basin.basin_list):
LOGGER.debug(
"Adding basin %s to the list of basins that we have to check",
basin_component.get_uuid(),
)
to_be_checked.append(basin_component)
else:
LOGGER.debug(
"Basin %s is not composed! Adding it to the list of simple basins",
basin_uuid,
)
simple_basins[basin_uuid] = current_basin

if len(simple_basins) == 0:
raise ValueError(
f"The basin {basin_uuid} is a composed basin but it does not "
"contain any other basin"
)

if len(simple_basins) == 1:
basin = list(simple_basins.values())[0]
if not hasattr(basin, "borders"):
raise NotImplementedError(
f"Medunda only supports SimplePolygonalBasins, your current "
f"basin is a {type(basin)}"
)

longitudes = [p[0] for p in basin.borders]
latitudes = [p[1] for p in basin.borders]

return PolygonalDomain.create_from_coordinates(
name=name,
longitudes=longitudes,
latitudes=latitudes,
)

polygons = []
for current_basin in simple_basins.values():
if not hasattr(current_basin, "borders"):
raise NotImplementedError(
f"Medunda only supports SimplePolygonalBasins, but basin "
f"{current_basin.name} (that is part of your composed basin "
f"{basin.name}) is a {type(current_basin)}"
)
Comment thread
spiani marked this conversation as resolved.
longitudes = [p[0] for p in current_basin.borders]
latitudes = [p[1] for p in current_basin.borders]

current_polygon = PolygonalDomain.create_from_coordinates(
name=current_basin.name,
longitudes=longitudes,
latitudes=latitudes,
)
polygons.append(current_polygon)

return MultiPolygonalDomain.create_from_polygons(
name=name, polygons=polygons
)


def read_zipped_shapefile(compressed_path: Path, temporary_dir: Path):
"""
Read the content of a zipped file that contains
only one file (among the others) with an exstension .shp
only one file (among the others) with an extension .shp
and return the path to the uncompressed shapefile.
"""
with zipfile.ZipFile(compressed_path, "r") as zip_ref:
Expand Down Expand Up @@ -178,8 +402,8 @@ def read_zipped_shapefile(compressed_path: Path, temporary_dir: Path):
return shapefiles[0]


def read_domain(domain_description: Path) -> ConcreteDomain:
yaml_content = domain_description.read_text()
def read_domain_from_yaml(yaml_path: Path) -> ConcreteDomain:
yaml_content = yaml_path.read_text()
domain_description_raw = yaml.safe_load(yaml_content)

# Read the name
Expand Down Expand Up @@ -279,24 +503,28 @@ def read_domain(domain_description: Path) -> ConcreteDomain:
latitudes=poly.border_latitudes,
)
elif geo_type == "basin":
basin_uuid = geometry["uuid"]
basin = bitsea_basin.Basin.load_from_uuid(basin_uuid)

if not hasattr(basin, "borders"):
raise NotImplementedError(
f"Medunda only supports SimplePolygonalBasins, your current "
f"basin is a {type(basin)}"
)
longitudes = [p[0] for p in basin.borders]
latitudes = [p[1] for p in basin.borders]

return PolygonalDomain.create_from_coordinates(
name=name,
longitudes=longitudes,
latitudes=latitudes,
)
return domain_from_basin(geometry["uuid"], name=name)

raise Exception(
"The geometry type you have chosen should have been implemented "
"but, because of a bug of the code, it is not recognized"
)


def domain_from_string(domain_description: str) -> ConcreteDomain:
if domain_description.startswith("basin:"):
basin_uuid = domain_description[len("basin:") :]
try:
return domain_from_basin(basin_uuid=basin_uuid, name=basin_uuid)
except InvalidBasinUUIDError as e:
raise argparse.ArgumentTypeError(str(e)) from e

yaml_path = _read_path(domain_description)
if not yaml_path.is_file():
raise argparse.ArgumentTypeError(
f"The provided domain description {domain_description} is not a "
"valid file path nor a valid basin identifier (it should start "
"with 'basin:')"
)

return read_domain_from_yaml(yaml_path)
12 changes: 5 additions & 7 deletions src/medunda/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from medunda.components.geodata import GeoDataCollection
from medunda.components.variables import VariableDataset
from medunda.domains.domain import ConcreteDomain
from medunda.domains.domain import read_domain
from medunda.domains.domain import domain_from_string
from medunda.providers import PROVIDERS
from medunda.providers import get_provider
from medunda.tools.argparse_utils import date_from_str
Expand Down Expand Up @@ -100,9 +100,9 @@ def configure_parser(
)
create_subparser.add_argument(
"--domain",
type=Path,
type=domain_from_string,
required=True,
help="Choose the domain",
help="Choose the domain (path to a YAML file, or 'basin:<uuid>')",
)

create_subparser.add_argument(
Expand Down Expand Up @@ -312,15 +312,13 @@ def validate_dataset(filepath, variable, max_depth: float | None):

def downloader(args):
if args.action == "create":
domain = read_domain(args.domain)

downloaded_files = download_data(
variables=args.variables,
output_dir=args.output_dir,
frequency=Frequency(args.frequency),
start=args.start_date,
end=args.end_date,
domain=domain,
domain=args.domain,
split_by=args.split_by,
provider_class_name=args.provider,
provider_config=args.provider_config,
Expand All @@ -331,7 +329,7 @@ def downloader(args):
if validate_dataset(
filepath,
variable,
max_depth=domain.bounding_box.maximum_depth,
max_depth=args.domain.bounding_box.maximum_depth,
):
LOGGER.info(
f"dataset validated for variable: '{variable}'"
Expand Down
Loading