Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 0 additions & 65 deletions InteroperabilityEnabler/utils/annotation_dataset.py

This file was deleted.

53 changes: 10 additions & 43 deletions InteroperabilityEnabler/utils/data_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@


def data_mapper(
context_df: pd.DataFrame, time_series_df: pd.DataFrame, sep="__"
) -> dict:
context_df: pd.DataFrame, time_series_df: pd.DataFrame, sep="__") -> dict:
"""
Maps data from context and time series DataFrames into a structured dictionary format,
while organizing instance-level quality annotations and grouping attributes from time
series data. The function ensures proper nesting of "hasQuality" fields, utilizes a
custom separator for splitting field names, and preserves timestamp data in ISO 8601 format.
while grouping attributes from time series data.

Args:
context_df (pd.DataFrame): The context DataFrame, expected to contain a single row
Expand All @@ -32,35 +29,21 @@ def data_mapper(
dict: A dictionary containing context-level attributes along with grouped and
timestamped attribute data from the time series DataFrame.
"""
# Extract context as dict
# Extract context as dictionary (single row)
context = context_df.iloc[0].to_dict()

# Handle instance-level hasQuality annotation from context
instance_type_key = f"hasQuality{sep}type"
instance_object_key = f"hasQuality{sep}object"
if instance_type_key in context and instance_object_key in context:
if pd.notna(context[instance_type_key]) and pd.notna(
context[instance_object_key]
):
context["hasQuality"] = {
"type": context.pop(instance_type_key),
"object": context.pop(instance_object_key),
}
else:
context.pop(instance_type_key, None)
context.pop(instance_object_key, None)

# Prepare time series attribute grouping
# Initialize attribute grouping: attr -> list of observation dicts
attribute_groups = {}

for _, row in time_series_df.iterrows():
ts = row["observedAt"]
ts_iso = datetime.utcfromtimestamp(int(ts)).strftime("%Y-%m-%dT%H:%M:%SZ")

# Temporarily collect fields for each attribute
attr_temp = {}

for col, val in row.items():
if col == "observedAt":
if col == "observedAt": # or pd.isna(val):
continue

if sep in col:
Expand All @@ -69,29 +52,13 @@ def data_mapper(
attr_temp[attr] = {}
attr_temp[attr][field] = val

# Add observedAt to each attribute's dict
for attr, data in attr_temp.items():
data["observedAt"] = ts_iso

# Detect and nest hasQuality fields if present
hq_type_key = "hasQuality" + sep + "type"
hq_obj_key = "hasQuality" + sep + "object"

hq_type = data.pop(hq_type_key, None)
hq_obj = data.pop(hq_obj_key, None)

# if pd.notna(hq_type) and pd.notna(hq_obj):
# data["hasQuality"] = {"type": hq_type, "object": hq_obj}

# Always add hasQuality key, with None if missing
data["hasQuality"] = {
"type": None if pd.isna(hq_type) else hq_type,
"object": None if pd.isna(hq_obj) else hq_obj,
}

# Store observations per attribute
if attr not in attribute_groups:
attribute_groups[attr] = []
attribute_groups[attr].append(data)

# Merge and return
return {**context, **attribute_groups}
# Merge context and time-series attributes
final_json = {**context, **attribute_groups}
return final_json
26 changes: 1 addition & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Interoperability Enabler (IE) component is designed to facilitate seamless integ
## Key Feature

- Data Formatter - Convert JSON data (time-series data) into the SEDIMARK internal processing format (pandas DataFrames)
- Data Quality Annotations - Enable adding any kind of quality annotations to data inside pandas DataFrames
- Data Mapper – Convert data from pandas DataFrames into JSON
- Data Extractor – Extract relevant data from a pandas DataFrame
- Metadata Restorer – Restore metadata to a pandas DataFrame
Expand Down Expand Up @@ -37,7 +36,6 @@ InteroperabilityEnabler
│ └── utils
│ ├── __init__.py
│ ├── add_metadata.py
│ ├── annotation_dataset.py
│ ├── data_formatter.py
│ ├── data_mapper.py
│ ├── extract_data.py
Expand All @@ -64,34 +62,12 @@ FILE_PATH="sample.json"
context_df, time_series_df = data_formatter(FILE_PATH)
```


#### Data Quality Annotations (to enrich pandas DataFrames by adding quality annotations)

Instance-level annotations:
```python
from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df

context_df, annotated_df = add_quality_annotations_to_df(
context_df, time_series_df, assessed_attrs=None
)
```

Attribute-level annotation:
```python
from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df

assessed_attrs = ["no"] # Base attribute name
context_df, annotated_df = add_quality_annotations_to_df(
context_df, time_series_df, assessed_attrs=assessed_attrs
)
```

#### Data Mapper (to convert the DataFrame into JSON format)

```python
from InteroperabilityEnabler.utils.data_mapper import data_mapper

data_json = data_mapper(context_df, annotated_df)
data_json = data_mapper(context_df, time_series_df)
```

#### Data Extractor (to extract and return specific columns from a pandas DataFrame)
Expand Down
24 changes: 1 addition & 23 deletions README_package.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Interoperability Enabler (IE) component is designed to facilitate seamless integ
## Key Feature

- Data Formatter - Convert JSON data (time-series data) into the SEDIMARK internal processing format (pandas DataFrames)
- Data Quality Annotations - Enable adding any kind of quality annotations to data inside pandas DataFrames
- Data Mapper – Convert data from pandas DataFrames into JSON
- Data Extractor – Extract relevant data from a pandas DataFrame
- Metadata Restorer – Restore metadata to a pandas DataFrame
Expand All @@ -33,33 +32,12 @@ FILE_PATH="sample.json"
context_df, time_series_df = data_formatter(FILE_PATH)
```

#### Data Quality Annotations (to enrich pandas DataFrames by adding quality annotations)

Instance-level annotations:
```python
from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df

context_df, annotated_df = add_quality_annotations_to_df(
context_df, time_series_df, assessed_attrs=None
)
```

Attribute-level annotation:
```python
from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df

assessed_attrs = ["no"] # Base attribute name
context_df, annotated_df = add_quality_annotations_to_df(
context_df, time_series_df, assessed_attrs=assessed_attrs
)
```

#### Data Mapper (to convert the DataFrame into JSON format)

```python
from InteroperabilityEnabler.utils.data_mapper import data_mapper

data_json = data_mapper(context_df, annotated_df)
data_json = data_mapper(context_df, time_series_df)
```

#### Data Extractor (to extract and return specific columns from a pandas DataFrame)
Expand Down
96 changes: 0 additions & 96 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pandas as pd
import pytest
from InteroperabilityEnabler.utils.data_formatter import data_formatter
from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df
from io import StringIO
from InteroperabilityEnabler.utils.merge_data import merge_predicted_data
from InteroperabilityEnabler.utils.extract_data import extract_columns
Expand Down Expand Up @@ -58,81 +57,6 @@ def test_data_formatter(file_path):



@pytest.mark.parametrize("file_path", [FILE_PATH_JSON])
def test_instance_level_annotation(file_path):
"""
Data quality annotation component tests.
Instance-level annotations.
"""
# Load JSON data
with open(file_path, "r", encoding="utf-8") as f:
json_data = json.load(f)

# Convert to DataFrames
context_df, time_series_df = data_formatter(json_data, sep="__")

# Apply instance-level annotation
updated_context_df, updated_time_series_df = add_quality_annotations_to_df(
context_df, time_series_df, sep="__", assessed_attrs=None
)

# Assertions for context-level quality annotation
assert "hasQuality__type" in updated_context_df.columns
assert "hasQuality__object" in updated_context_df.columns

assert updated_context_df.loc[0, "hasQuality__type"] == "Relationship"
assert updated_context_df.loc[0, "hasQuality__object"] == (
"urn:ngsi-ld:DataQualityAssessment:MonitoringSite:urn:sedimark:station:1"
)

# Time-series DataFrame should remain unchanged
assert "pm10__hasQuality__type" not in updated_time_series_df.columns



@pytest.mark.parametrize("file_path", [FILE_PATH_JSON])
def test_attribute_level_annotation(file_path):
"""
Data quality annotation component tests.
Attribut-level annotation.
"""
# Load JSON data from file
with open(file_path, "r", encoding="utf-8") as f:
json_data = json.load(f)

# Convert to DataFrames
context_df, time_series_df = data_formatter(json_data, sep="__")

# Apply attribute-level annotation on 'pm10'
updated_context_df, updated_time_series_df = add_quality_annotations_to_df(
context_df,
time_series_df,
sep="__",
assessed_attrs=["pm10"]
)

# Check that new quality columns are added for 'pm10'
assert "pm10__hasQuality__type" in updated_time_series_df.columns
assert "pm10__hasQuality__object" in updated_time_series_df.columns

# Ensure all annotated rows have correct values
expected_object_uri = (
"urn:ngsi-ld:DataQualityAssessment:MonitoringSite:urn:sedimark:station:1:pm10"
)

for i in range(len(updated_time_series_df)):
has_value = pd.notna(updated_time_series_df.loc[i, "pm10__value"])
expected_type = "Relationship" if has_value else None
expected_obj = expected_object_uri if has_value else None

assert updated_time_series_df.loc[i, "pm10__hasQuality__type"] == expected_type
assert updated_time_series_df.loc[i, "pm10__hasQuality__object"] == expected_obj

# Confirm context_df is unchanged (no instance-level fields)
assert "hasQuality__type" not in updated_context_df.columns



@pytest.mark.parametrize("file_path", [FILE_PATH_JSON])
def test_data_mapper(file_path):
"""
Expand All @@ -146,14 +70,6 @@ def test_data_mapper(file_path):
# Format data
context_df, time_series_df = data_formatter(json_data, sep="__")

# Apply attribute-level annotation on 'no2'
context_df, time_series_df = add_quality_annotations_to_df(
context_df,
time_series_df,
sep="__",
assessed_attrs=["no2"]
)

# Map back to JSON structure
mapped_data = data_mapper(context_df, time_series_df, sep="__")

Expand All @@ -163,18 +79,6 @@ def test_data_mapper(file_path):
assert mapped_data["type"] == "MonitoringSite"
assert "no2" in mapped_data

# Check at least one annotation exists for 'no2'
no2_values = mapped_data["no2"]
assert isinstance(no2_values, list)

found_annotated = any(
"hasQuality" in item and
item["hasQuality"]["type"] == "Relationship" and
item["hasQuality"]["object"].endswith(":no2")
for item in no2_values
)
assert found_annotated, "No attribute-level annotation found for no2"


def test_extract_columns_valid_indices():
"""
Expand Down