Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 5 additions & 59 deletions backend/ibex/core/ibex_service.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,13 @@
"""Logic between endpoint and data sources"""

import time
import re
from pathlib import Path
from functools import wraps # for measure_execution_time()
from typing import Any, Callable, Optional, Sequence, List

from ibex.data_source.imas_python_source import IMASPythonSource
from ibex.data_source.exception import CannotGenerateUriException
from dataclasses import dataclass


@dataclass
class IMAS_URI:
"""
Helper class to extract arguments from imas uri
"""

#: Full URI containing pulse file identifier, ids name and path to node
full_uri: str = ""

#: pulse file identifier extracted from full URI
uri_entry_identifiers: str = ""
#: fragment part from full URI containing ids name and path to node
uri_fragment: str = ""
#: ids name extracted from full URI
ids_name: str = ""
#: path to node extracted from full URI
node_path: str = ""
#: ids occurrence number extracted from full URI
occurrence: int = 0

def __init__(self, full_uri):
"""
IMAS_URI constructor
:param full_uri: pulsefile uri along with #fragment part
"""

self.full_uri = full_uri

if "#" not in self.full_uri:
self.uri_entry_identifiers = self.full_uri
return

self.uri_entry_identifiers, self.uri_fragment = self.full_uri.split("#", 1)

pattern = r"^(?P<idsname>[^:/]+)(?::(?P<occurrence>[^/]*))?(?:/(?P<node_path>.*))?$"

match = re.match(pattern, self.uri_fragment)

if not match:
return

self.ids_name = match.group("idsname") if match.group("idsname") else ""
self.occurrence = match.group("occurrence") if match.group("occurrence") else 0
self.node_path = match.group("node_path") if match.group("node_path") else ""

def __str__(self):
return (
f"FULL URI : {self.full_uri}\n"
f"URI : {self.uri_entry_identifiers}\n"
f"FRAGMENT : {self.uri_fragment}\n"
f"IDS : {self.ids_name}\n"
f"OCCURRENCE : {self.occurrence}\n"
f"NODE_PATH : {self.node_path}\n"
)
from ibex.core.utils import IMAS_URI


# helper decorator used during development
Expand Down Expand Up @@ -172,13 +115,16 @@ def get_multiple_node_data(uri: str) -> dict:
)


def get_plot_data(uri: str, downsampling_method: str | None, downsampled_size: int) -> dict:
def get_plot_data(
uri: str, interpolate_over: List[str] | None, downsampling_method: str | None, downsampled_size: int
) -> dict:
uri_obj = IMAS_URI(uri)
return data_source.get_plot_data(
uri=uri_obj.uri_entry_identifiers,
ids=uri_obj.ids_name,
node_path=uri_obj.node_path,
occurrence=uri_obj.occurrence,
interpolate_over=interpolate_over,
downsampling_method=downsampling_method,
downsampled_size=downsampled_size,
)
58 changes: 58 additions & 0 deletions backend/ibex/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from ibex.data_source.exception import NotAnArrayException, InvalidParametersException

import numpy as np # type: ignore
from dataclasses import dataclass
import re


def find_first_value_in_list(data: list):
Expand Down Expand Up @@ -212,3 +214,59 @@ def downsample_data(data: List, target_size: int, method: str | None = None, x=N
return x, data[s_ds]

return x, data[s_ds]


@dataclass
class IMAS_URI:
"""
Helper class to extract arguments from imas uri
"""

#: Full URI containing pulse file identifier, ids name and path to node
full_uri: str = ""

#: pulse file identifier extracted from full URI
uri_entry_identifiers: str = ""
#: fragment part from full URI containing ids name and path to node
uri_fragment: str = ""
#: ids name extracted from full URI
ids_name: str = ""
#: path to node extracted from full URI
node_path: str = ""
#: ids occurrence number extracted from full URI
occurrence: int = 0

def __init__(self, full_uri):
"""
IMAS_URI constructor
:param full_uri: pulsefile uri along with #fragment part
"""

self.full_uri = full_uri

if "#" not in self.full_uri:
self.uri_entry_identifiers = self.full_uri
return

self.uri_entry_identifiers, self.uri_fragment = self.full_uri.split("#", 1)

pattern = r"^(?P<idsname>[^:/]+)(?::(?P<occurrence>[^/]*))?(?:/(?P<node_path>.*))?$"

match = re.match(pattern, self.uri_fragment)

if not match:
return

self.ids_name = match.group("idsname") if match.group("idsname") else ""
self.occurrence = match.group("occurrence") if match.group("occurrence") else 0
self.node_path = match.group("node_path") if match.group("node_path") else ""

def __str__(self):
return (
f"FULL URI : {self.full_uri}\n"
f"URI : {self.uri_entry_identifiers}\n"
f"FRAGMENT : {self.uri_fragment}\n"
f"IDS : {self.ids_name}\n"
f"OCCURRENCE : {self.occurrence}\n"
f"NODE_PATH : {self.node_path}\n"
)
71 changes: 70 additions & 1 deletion backend/ibex/data_source/imas_python_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
InvalidParametersException,
)
from ibex.core.utils import downsample_data, transform_2D_data, find_first_value_in_list
from ibex.core.utils import IMAS_URI
from ibex.data_source.imas_python_source_utils import (
convert_ids_data_into_numpy_array,
resample_data,
pad_to_rectangular,
flatten,
)


class IMASPythonSource(DataSourceInterface):
Expand Down Expand Up @@ -586,6 +593,7 @@ def get_plot_data(
ids: str,
node_path: str,
occurrence: int = 0,
interpolate_over: List[str] | None = None,
downsampling_method: str | None = None,
downsampled_size: int = 1000,
):
Expand Down Expand Up @@ -764,7 +772,68 @@ def get_plot_data(
}
coordinates_to_be_returned.append(c)
first_value = find_first_value_in_list(ids_data)
data_to_be_returned = ids_data
data_to_be_returned = convert_ids_data_into_numpy_array(ids_data)

# ============= BEGIN resample data onto new time vector =============

def convert_to_lists(data):
if isinstance(data, list):
return [convert_to_lists(d) for d in data]
elif isinstance(data, (np.ndarray, IDSNumericArray)):
return data.tolist()
else:
return data

if interpolate_over:
# =================== GATHER ALL COORDINATES ===================
original_coord_values = []
new_common_coords = coordinates_to_be_returned
for c in new_common_coords:
c["value"] = convert_to_lists(c["value"])
original_coord_values.append(sorted(set(flatten(c["value"]))))
original_coord_values.reverse()

for _uri in interpolate_over:
_uri_obj = IMAS_URI(_uri)

if _uri_obj.ids_name != ids or _uri_obj.node_path != node_path:
raise InvalidParametersException(
"IDS name and node path should be the same for source and target URI when interpolating data"
)

interpolate_to_coordinates = self.get_plot_data(
uri=_uri_obj.uri_entry_identifiers,
ids=_uri_obj.ids_name,
node_path=_uri_obj.node_path,
occurrence=_uri_obj.occurrence,
downsampling_method=downsampling_method,
downsampled_size=downsampled_size,
)["data"]["coordinates"]

if len(interpolate_to_coordinates) != len(coordinates_to_be_returned):
message = "Interpolation error. Source and target nodes have different number of coordinates."
raise InvalidParametersException(message)

for x, y in zip(coordinates_to_be_returned, interpolate_to_coordinates):
if x["name"] != y["name"]:
# coordinates between quantities doesn't match
message = f"Interpolation error. Coordinates names does not match between target and source nodes ({x['name']} vs. {y['name']})."
raise InvalidParametersException(message)

x["value"] = sorted(set(flatten(x["value"]) + flatten(convert_to_lists(y["value"]))))

# reverse coordinates list so it matches data dimensions
new_common_coords.reverse()
common_coords_values = [c["value"] for c in new_common_coords]
# =================== INTERPOLATE ===================

# === make data vector rectangular ===
data_to_be_returned = pad_to_rectangular(data_to_be_returned)
data_to_be_returned = resample_data(
tuple(original_coord_values), data_to_be_returned, tuple(common_coords_values)
)

# ============= END resample data onto new time vector =============

if first_value.metadata.ndim == 2:
# Transform 2D arrays.
Expand Down
101 changes: 101 additions & 0 deletions backend/ibex/data_source/imas_python_source_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from functools import reduce

import numpy as np
from imas.ids_primitive import IDSNumericArray
from scipy.interpolate import RegularGridInterpolator


def union_arrays(data: list):
return reduce(np.union1d, data)


def flatten(lst):
result = []
for item in lst:
if isinstance(item, list):
result.extend(flatten(item))
else:
result.append(item)
return result


def get_max_shape(lst, level=0, shape=None):
"""
Returns shape of irregular array. Result contains maximum array length in every dimension.
:param lst: input array
:return:
"""
if shape is None:
shape = []

if isinstance(lst, (list, np.ndarray)):
if len(shape) <= level:
shape.append(0)
shape[level] = max(shape[level], len(lst))

for item in lst:
get_max_shape(item, level + 1, shape)

return shape


def fill_array(arr, lst, index=()):
"""
Recursively fills an array with values from a nested list.

:param arr: Array-like object supporting tuple indexing.
:param lst: Nested list with values to insert into the array.
:param index: Current index used during recursion.
:return: None (modifies arr in place).
"""
if isinstance(lst, list):
for i, item in enumerate(lst):
fill_array(arr, item, index + (i,))
else:
arr[index] = lst


def pad_to_rectangular(lst):
"""
Converts a nested list into a rectangular NumPy array by padding
missing values with NaN.

:param lst: Nested list with uneven lengths.
:return: NumPy array with NaN padding.
"""
shape = tuple(get_max_shape(lst))
arr = np.full(shape, np.nan)
fill_array(arr, lst)
return arr


def resample_data(original_coords: list, data: list, target_coords: list):
"""
Resamples data onto new set of coordinates.
:param original_coords: List of original data coordinates.
:param data: Nested n-dimensional data array.
:param target_coords: List of target coordinates.
:return: Resampled data array.
"""
interpolator = RegularGridInterpolator(original_coords, data, bounds_error=False)

# build mesh grid (manipulate coordinates to be list of coordinates e.g. [[x1,y1,z1,h1...], [x2,y2,z2,h3...]])
mesh = np.meshgrid(*target_coords, indexing="ij")
points = np.stack(mesh, axis=-1).reshape(-1, len(target_coords))

result = interpolator(points)

# revert mesh shape
result = result.reshape([len(c) for c in target_coords])

return result


def convert_ids_data_into_numpy_array(data: list):

if isinstance(data, list):
return [convert_ids_data_into_numpy_array(x) for x in data]
elif isinstance(data, IDSNumericArray):
return data.value
else:
return data
13 changes: 10 additions & 3 deletions backend/ibex/endpoints/data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Endpoints extracting data from data source"""

import orjson
from typing import List, Any
from typing import List, Any, Optional

from fastapi import APIRouter, Query # type: ignore
from fastapi.responses import ORJSONResponse # type: ignore
Expand Down Expand Up @@ -74,7 +74,12 @@ def field_value(
description="Returns single (or tensorized) data node value with detailed parameters used to plot the data",
)
@ibex_service.measure_execution_time
def plot_data(uri: str, downsampling_method: str | None = Query(None), downsampled_size: int = 1000) -> Any:
def plot_data(
uri: str,
interpolate_over: Optional[List[str]] = Query(None),
downsampling_method: str | None = Query(None),
downsampled_size: int = 1000,
) -> Any:
"""
IBEX endpoint. Prepares and returns full information about data node and it's coordinates.

Expand Down Expand Up @@ -114,4 +119,6 @@ def plot_data(uri: str, downsampling_method: str | None = Query(None), downsampl
:rtype: dict (automatically converted to JSON by FastAPI)
:return: JSON response
"""
return CustomORJSONResponse(ibex_service.get_plot_data(uri.strip(), downsampling_method, downsampled_size))
return CustomORJSONResponse(
ibex_service.get_plot_data(uri.strip(), interpolate_over, downsampling_method, downsampled_size)
)
Loading