From 3862e26c46ee6fe656b6e3df9af08d60d0ef14a8 Mon Sep 17 00:00:00 2001 From: shahin Date: Fri, 1 Aug 2025 01:06:04 +0200 Subject: [PATCH 1/7] update unit tests --- tests/example_json.json | 238 +++++++++++++++++++++++++++++++++---- tests/test_basic.py | 255 ++++++++++++++++++---------------------- 2 files changed, 328 insertions(+), 165 deletions(-) diff --git a/tests/example_json.json b/tests/example_json.json index 056672c..fb1a8ae 100644 --- a/tests/example_json.json +++ b/tests/example_json.json @@ -1,36 +1,230 @@ { - "id": "urn:ngsild:Vehicle:vehicle:MobilityManagement:196671", - "type": "Vehicle", - "category": {"type": "Property", "value": "tracked"}, - "vehicleNumber": {"type": "Property", "value": "379131"}, - "battery": [ + "id": "urn:sedimark:station:1", + "type": "MonitoringSite", + "specificAccessPolicy": [ { "type": "Property", - "value": 1, - "observedAt": "2024-09-25T04:30:06Z", - "unitCode": "P1" + "value": "AUTH_WRITE" + } + ], + "pm10": [ + { + "type": "Property", + "value": 22.9, + "instanceId": "urn:ngsi-ld:Instance:32e14421-e740-4722-8f6f-00922a7d9bf1", + "observedAt": "2025-07-24T12:00:00Z" + }, + { + "type": "Property", + "value": 24.2, + "instanceId": "urn:ngsi-ld:Instance:0c4f6936-aa78-45bd-a225-8c7f487726f5", + "observedAt": "2025-07-24T13:00:00Z" + }, + { + "type": "Property", + "value": 23, + "instanceId": "urn:ngsi-ld:Instance:8166c0ed-38cc-42c6-b148-6fded91bc39b", + "observedAt": "2025-07-24T14:00:00Z" + } + ], + "pnci": [ + { + "type": "Property", + "value": 6983, + "instanceId": "urn:ngsi-ld:Instance:8b3be0e3-4166-4cba-b5e5-b7d2f57f57a3", + "observedAt": "2025-07-24T12:00:00Z" + }, + { + "type": "Property", + "value": 7028, + "instanceId": "urn:ngsi-ld:Instance:2ec355ba-d151-454f-86fb-60f210f782ad", + "observedAt": "2025-07-24T13:00:00Z" }, { "type": "Property", - "value": 0.98, - "observedAt": "2024-09-24T16:42:24Z", - "unitCode": "P1" + "value": 6765, + "instanceId": "urn:ngsi-ld:Instance:3f902dc0-6969-496d-802d-9bc3fcb66eac", + "observedAt": "2025-07-24T14:00:00Z" } ], - "location": [ + "no": [ { - "type": "GeoProperty", - "value": {"type": "Point", "coordinates": [43.460405, -3.853312]}, - "observedAt": "2024-09-24T15:45:58Z" + "type": "Property", + "value": 3, + "instanceId": "urn:ngsi-ld:Instance:ad99ae3c-aef3-49ea-894e-7cfa484411ef", + "observedAt": "2025-07-24T12:00:00Z" }, { - "type": "GeoProperty", - "value": {"type": "Point", "coordinates": [43.459994, -3.820141]}, - "observedAt": "2024-09-24T15:09:14Z" + "type": "Property", + "value": 3.9, + "instanceId": "urn:ngsi-ld:Instance:83ab9f5c-f592-43d6-b4bd-2b33e8746c7c", + "observedAt": "2025-07-24T13:00:00Z" + }, + { + "type": "Property", + "value": 2.6, + "instanceId": "urn:ngsi-ld:Instance:2b0f1a1e-dc04-47cf-b442-0337d1243113", + "observedAt": "2025-07-24T14:00:00Z" } ], - "@context": [ - "https://raw.githubusercontent.com/smart-data-models/dataModel.ERA/master/context.jsonld", - "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context-v1.7.jsonld" + "indeksi": [ + { + "type": "Property", + "value": 61, + "instanceId": "urn:ngsi-ld:Instance:5fb47305-23c9-42a0-b1d1-948d788bded8", + "observedAt": "2025-07-24T12:00:00Z" + }, + { + "type": "Property", + "value": 61, + "instanceId": "urn:ngsi-ld:Instance:17fbc692-b0c0-487c-96e3-4eb46be507d8", + "observedAt": "2025-07-24T13:00:00Z" + }, + { + "type": "Property", + "value": 60, + "instanceId": "urn:ngsi-ld:Instance:6f1344a7-ad6f-4a61-9c4c-ea96e56c41e9", + "observedAt": "2025-07-24T14:00:00Z" + } + ], + "pm25": [ + { + "type": "Property", + "value": 16.8, + "instanceId": "urn:ngsi-ld:Instance:584ec5f5-0aca-4ec3-9ba5-35f628eaff46", + "observedAt": "2025-07-24T12:00:00Z" + }, + { + "type": "Property", + "value": 16.6, + "instanceId": "urn:ngsi-ld:Instance:72f5e615-0dd3-4971-9a26-812e15eec47b", + "observedAt": "2025-07-24T13:00:00Z" + }, + { + "type": "Property", + "value": 16.3, + "instanceId": "urn:ngsi-ld:Instance:efddb845-b38e-4ff3-b34a-7e3f3dc402aa", + "observedAt": "2025-07-24T14:00:00Z" + } + ], + "no2": [ + { + "type": "Property", + "value": 10.9, + "instanceId": "urn:ngsi-ld:Instance:56c842df-0861-4e9c-84b5-ebb536f6ab54", + "observedAt": "2025-07-24T12:00:00Z" + }, + { + "type": "Property", + "value": 10.8, + "instanceId": "urn:ngsi-ld:Instance:6fb8fbd6-8049-43fe-8cf3-54ceb341d2de", + "observedAt": "2025-07-24T13:00:00Z" + }, + { + "type": "Property", + "value": 9.6, + "instanceId": "urn:ngsi-ld:Instance:54969cbf-d553-4d26-a3ef-f1c0b7d53882", + "observedAt": "2025-07-24T14:00:00Z" + } + ], + "https://vocab.egm.io/pm25": [ + { + "type": "Property", + "value": 6.6, + "instanceId": "urn:ngsi-ld:Instance:fb243b31-e8f0-4880-be7c-a27d7e5a0826", + "observedAt": "2025-04-17T06:00:00Z" + }, + { + "type": "Property", + "value": 7, + "instanceId": "urn:ngsi-ld:Instance:71faec96-9f9a-493d-b959-a891ff8b7c10", + "observedAt": "2025-04-17T07:00:00Z" + }, + { + "type": "Property", + "value": 7, + "instanceId": "urn:ngsi-ld:Instance:4e1438f1-97c6-462c-9947-40afb8a0e080", + "observedAt": "2025-04-17T08:00:00Z" + } + ], + "ldsa": [ + { + "type": "Property", + "value": 13.3, + "instanceId": "urn:ngsi-ld:Instance:f75c79c8-9075-4ecd-8a6b-5ccf264a2b29", + "observedAt": "2025-07-24T12:00:00Z" + }, + { + "type": "Property", + "value": 13.1, + "instanceId": "urn:ngsi-ld:Instance:188cc122-e714-4da6-8182-00511e42729d", + "observedAt": "2025-07-24T13:00:00Z" + }, + { + "type": "Property", + "value": 12.5, + "instanceId": "urn:ngsi-ld:Instance:2621e03c-da76-487f-a326-79225c12c46e", + "observedAt": "2025-07-24T14:00:00Z" + } + ], + "https://vocab.egm.io/pm10": [ + { + "type": "Property", + "value": 11.3, + "instanceId": "urn:ngsi-ld:Instance:d78fe8be-0c9a-460b-a696-ad60c8ddb5da", + "observedAt": "2025-04-17T06:00:00Z" + }, + { + "type": "Property", + "value": 11.4, + "instanceId": "urn:ngsi-ld:Instance:eb2c5e91-cd24-4223-bb5b-d6d32e7faf7a", + "observedAt": "2025-04-17T07:00:00Z" + }, + { + "type": "Property", + "value": 11.5, + "instanceId": "urn:ngsi-ld:Instance:239870a1-22c6-430b-be85-828f3848c16f", + "observedAt": "2025-04-17T08:00:00Z" + } + ], + "https://vocab.egm.io/no2": [ + { + "type": "Property", + "value": 10.5, + "instanceId": "urn:ngsi-ld:Instance:b4ac0ff4-ea60-4696-85cc-de998380777f", + "observedAt": "2025-04-17T06:00:00Z" + }, + { + "type": "Property", + "value": 26.1, + "instanceId": "urn:ngsi-ld:Instance:f9078b14-1845-47c9-8ebc-829325ad0d14", + "observedAt": "2025-04-17T07:00:00Z" + }, + { + "type": "Property", + "value": 36, + "instanceId": "urn:ngsi-ld:Instance:1103c1eb-353f-42dc-a672-e81950e893b3", + "observedAt": "2025-04-17T08:00:00Z" + } + ], + "bc": [ + { + "type": "Property", + "value": 0.43, + "instanceId": "urn:ngsi-ld:Instance:5e17b87b-e6e4-4bc7-9095-8fce8456f35d", + "observedAt": "2025-07-24T12:00:00Z" + }, + { + "type": "Property", + "value": 0.59, + "instanceId": "urn:ngsi-ld:Instance:54412842-086f-40f3-bd7b-910a9243be21", + "observedAt": "2025-07-24T13:00:00Z" + }, + { + "type": "Property", + "value": 0.51, + "instanceId": "urn:ngsi-ld:Instance:16471be2-77f4-40ac-9c6e-e02fe1937340", + "observedAt": "2025-07-24T14:00:00Z" + } ] -} +} \ No newline at end of file diff --git a/tests/test_basic.py b/tests/test_basic.py index 715ca07..9f9577f 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -3,44 +3,15 @@ import pytest from InteroperabilityEnabler.utils.data_formatter import data_to_dataframe from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df -from InteroperabilityEnabler.utils.data_mapper import data_conversion, restore_ngsi_ld_structure from io import StringIO from InteroperabilityEnabler.utils.merge_data import merge_predicted_data from InteroperabilityEnabler.utils.extract_data import extract_columns from InteroperabilityEnabler.utils.add_metadata import add_metadata_to_predictions_from_dataframe +from InteroperabilityEnabler.utils.data_mapper import data_mapper +import json -FILE_PATH_JSON = "tests/example_json.json" - -# Expected values to validate -DATA = { - "id": "urn:ngsild:Vehicle:vehicle:MobilityManagement:196671", - "type": "Vehicle", - "category.type": "Property", - "category.value": "tracked", - "vehicleNumber.type": "Property", - "vehicleNumber.value": "379131", - "battery[0].type": "Property", - "battery[0].value": 1, - "battery[0].observedAt": "2024-09-25T04:30:06Z", - "battery[0].unitCode": "P1", - "battery[1].type": "Property", - "battery[1].value": 0.98, - "battery[1].observedAt": "2024-09-24T16:42:24Z", - "battery[1].unitCode": "P1", - "location[0].type": "GeoProperty", - "location[0].value.type": "Point", - "location[0].value.coordinates": [43.460405, -3.853312], - "location[0].observedAt": "2024-09-24T15:45:58Z", - "location[1].type": "GeoProperty", - "location[1].value.type": "Point", - "location[1].value.coordinates": [43.459994, -3.820141], - "location[1].observedAt": "2024-09-24T15:09:14Z", - "@context": [ - "https://raw.githubusercontent.com/smart-data-models/dataModel.ERA/master/context.jsonld", - "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context-v1.7.jsonld", - ], -} +FILE_PATH_JSON = "example_json.json" MOCK_CSV = """ @@ -67,129 +38,127 @@ @pytest.mark.parametrize("file_path", [FILE_PATH_JSON]) -def test_json_to_dataframe(file_path): - """ - Data Formatter component tests: JSON to DataFrame - """ - print("\nJData Formatter component tests: JSON to DataFrame.") - df = data_to_dataframe(file_path) - assert isinstance(df, pd.DataFrame), f"{file_path} did not return a DataFrame" - assert not df.empty, f"{file_path} returned an empty DataFrame" - row = df.loc[0] - for key, expected_value in DATA.items(): - assert key in row, f"Missing key '{key}' in DataFrame from {file_path}" - assert ( - row[key] == expected_value - ), f"Mismatch for '{key}' in {file_path}: expected {expected_value}, got {row[key]}" - - -def test_instance_level_annotation(): - """ - Data Quality Annotation component tests. - Entire instance level annotation. - """ - print( - "\nData Quality Annotation component tests: entire instance level annotation." +def test_data_formatter(file_path): + # Load the JSON file from disk + with open(file_path, "r", encoding="utf-8") as f: + json_data = json.load(f) + + # Run the formatter + context_df, time_series_df = data_to_dataframe(json_data, sep="__") + + # Assertions (use `assert`, not `assertEqual`) + assert context_df.iloc[0]["id"] == "urn:sedimark:station:1" + assert context_df.iloc[0]["type"] == "MonitoringSite" + assert "pm10__value" in time_series_df.columns + assert "pnci__value" in time_series_df.columns + assert time_series_df["pm10__value"].iloc[3] == 22.9 + + + +@pytest.mark.parametrize("file_path", [FILE_PATH_JSON]) +def test_instance_level_annotation(file_path): + # Load JSON data + with open(file_path, "r", encoding="utf-8") as f: + json_data = json.load(f) + + # Convert to DataFrames + context_df, time_series_df = data_to_dataframe(json_data, sep="__") + + # Apply instance-level annotation + updated_context_df, updated_time_series_df = add_quality_annotations_to_df( + context_df, time_series_df, sep="__", assessed_attrs=None ) - df = pd.DataFrame(DATA) - result = add_quality_annotations_to_df( - data=df, - entity_type="Vehicle", - assessed_attrs=None, + + # Assertions for context-level quality annotation + assert "hasQuality__type" in updated_context_df.columns + assert "hasQuality__object" in updated_context_df.columns + + assert updated_context_df.loc[0, "hasQuality__type"] == "Relationship" + assert updated_context_df.loc[0, "hasQuality__object"] == ( + "urn:ngsi-ld:DataQualityAssessment:MonitoringSite:urn:sedimark:station:1" ) - assert "hasQuality.type" in result.columns - assert "hasQuality.object" in result.columns - assert result.loc[0, "hasQuality.type"] == "Relationship" - assert ( - result.loc[0, "hasQuality.object"] - == "urn:ngsi-ld:DataQualityAssessment:Vehicle:urn:ngsild:Vehicle:vehicle:MobilityManagement:196671" + + # Time-series DataFrame should remain unchanged + assert "pm10__hasQuality__type" not in updated_time_series_df.columns + + + +@pytest.mark.parametrize("file_path", [FILE_PATH_JSON]) +def test_attribute_level_annotation(file_path): + # Load JSON data from file + with open(file_path, "r", encoding="utf-8") as f: + json_data = json.load(f) + + # Convert to DataFrames + context_df, time_series_df = data_to_dataframe(json_data, sep="__") + + # Apply attribute-level annotation on 'pm10' + updated_context_df, updated_time_series_df = add_quality_annotations_to_df( + context_df, + time_series_df, + sep="__", + assessed_attrs=["pm10"] ) + # Check that new quality columns are added for 'pm10' + assert "pm10__hasQuality__type" in updated_time_series_df.columns + assert "pm10__hasQuality__object" in updated_time_series_df.columns -def test_attribute_level_annotation(): - """ - Data Quality Annotation component tests. - Attribute level annotation. - """ - print("\nData Quality Annotation component tests: attribute level annotation.") - df = pd.DataFrame(DATA) - result = add_quality_annotations_to_df( - data=df, entity_type="Vehicle", assessed_attrs=["battery"] + # Ensure all annotated rows have correct values + expected_object_uri = ( + "urn:ngsi-ld:DataQualityAssessment:MonitoringSite:urn:sedimark:station:1:pm10" ) - for i in [0, 1]: # because your example has battery[0] and battery[1] - type_col = f"battery[{i}].hasQuality.type" - object_col = f"battery[{i}].hasQuality.object" - assert type_col in result.columns - assert object_col in result.columns - assert result.loc[0, type_col] == "Relationship" - assert result.loc[0, object_col] == ( - f"urn:ngsi-ld:DataQualityAssessment:Vehicle:urn:ngsild:Vehicle:vehicle:MobilityManagement:196671:battery" - ) + for i in range(len(updated_time_series_df)): + has_value = pd.notna(updated_time_series_df.loc[i, "pm10__value"]) + expected_type = "Relationship" if has_value else None + expected_obj = expected_object_uri if has_value else None + assert updated_time_series_df.loc[i, "pm10__hasQuality__type"] == expected_type + assert updated_time_series_df.loc[i, "pm10__hasQuality__object"] == expected_obj -def test_granular_level_annotation(): - """ - Data Quality Annotation component tests. - Granular level annotation. - """ - print("\nData Quality Annotation component tests: granular level annotation.") - df = pd.DataFrame(DATA) - result = add_quality_annotations_to_df( - data=df, entity_type="Vehicle", assessed_attrs=["battery[0]"] - ) - assert "battery[0].hasQuality.type" in result.columns - assert "battery[0].hasQuality.object" in result.columns - assert result.loc[0, "battery[0].hasQuality.type"] == "Relationship" - assert ( - result.loc[0, "battery[0].hasQuality.object"] - == "urn:ngsi-ld:DataQualityAssessment:Vehicle:urn:ngsild:Vehicle:vehicle:MobilityManagement:196671:battery" + # Confirm context_df is unchanged (no instance-level fields) + assert "hasQuality__type" not in updated_context_df.columns + + + +@pytest.mark.parametrize("file_path", [FILE_PATH_JSON]) +def test_data_mapper(file_path): + # Load JSON data + with open(file_path, "r", encoding="utf-8") as f: + json_data = json.load(f) + + # Format data + context_df, time_series_df = data_to_dataframe(json_data, sep="__") + + # Apply attribute-level annotation on 'no2' + context_df, time_series_df = add_quality_annotations_to_df( + context_df, + time_series_df, + sep="__", + assessed_attrs=["no2"] ) + # Map back to JSON structure + mapped_data = data_mapper(context_df, time_series_df, sep="__") -def test_data_mapper(): - """ - Data Mapper component tests. - JSON to NGSI-LD - """ - print("\nData Mapper component tests: JSON to NGSI-LD") - df = pd.DataFrame([DATA]) - ngsi_ld_data = data_conversion(df) - assert isinstance(ngsi_ld_data, list) - assert len(ngsi_ld_data) == 1 - entity = ngsi_ld_data[0] - assert entity["id"] == DATA["id"] - assert entity["type"] == DATA["type"] - assert entity["category"]["type"] == "Property" - assert entity["category"]["value"] == "tracked" - assert entity["vehicleNumber"]["type"] == "Property" - assert entity["vehicleNumber"]["value"] == "379131" - assert "battery[0]" in entity - assert entity["battery[0]"]["type"] == "Property" - assert entity["battery[0]"]["value"] == 1 - assert entity["battery[0]"]["observedAt"] == "2024-09-25T04:30:06Z" - assert "location[1]" in entity - assert entity["location[1]"]["value"]["type"] == "Point" - assert entity["location[1]"]["value"]["coordinates"] == [43.459994, -3.820141] - assert "@context" in entity - - -def test_restore_ngsi_ld_structure(): - """ - Data Mapper component tests. - Restore NGSI-LD structure. - """ - print("\nData Mapper component tests: NGSI-LD structure restoration.") - df = pd.DataFrame([DATA]) - ngsi_ld_data = data_conversion(df) - restored = restore_ngsi_ld_structure(ngsi_ld_data[0]) - assert "battery" in restored - assert isinstance(restored["battery"], list) - assert restored["battery"][0]["type"] == "Property" - assert restored["battery"][1]["value"] == 0.98 - assert "location" in restored - assert isinstance(restored["location"], list) - assert restored["location"][1]["value"]["coordinates"] == [43.459994, -3.820141] + # Assertions + assert isinstance(mapped_data, dict) + assert mapped_data["id"] == "urn:sedimark:station:1" + assert mapped_data["type"] == "MonitoringSite" + assert "no2" in mapped_data + + # Check at least one annotation exists for 'no2' + no2_values = mapped_data["no2"] + assert isinstance(no2_values, list) + + found_annotated = any( + "hasQuality" in item and + item["hasQuality"]["type"] == "Relationship" and + item["hasQuality"]["object"].endswith(":no2") + for item in no2_values + ) + assert found_annotated, "No attribute-level annotation found for no2" def test_extract_columns_valid_indices(): From 030ff2feec3578dea0666f5f83ad8d427bfced77 Mon Sep 17 00:00:00 2001 From: shahin Date: Fri, 1 Aug 2025 01:10:02 +0200 Subject: [PATCH 2/7] update data quality annotations component --- .../utils/annotation_dataset.py | 95 ++++++++----------- 1 file changed, 37 insertions(+), 58 deletions(-) diff --git a/InteroperabilityEnabler/utils/annotation_dataset.py b/InteroperabilityEnabler/utils/annotation_dataset.py index ce74ec2..f3c0be7 100644 --- a/InteroperabilityEnabler/utils/annotation_dataset.py +++ b/InteroperabilityEnabler/utils/annotation_dataset.py @@ -9,78 +9,57 @@ def add_quality_annotations_to_df( - data, entity_type, assessed_attrs=None, type=None, context_value=None + context_df, time_series_df, sep="__", assessed_attrs=None ): """ - Add quality annotations to a DataFrame for either - instance-level or attribute-level annotations (but not both). + Add NGSI-LD quality annotations to either the context (instance-level) + or the time series (attribute-level). Args: - data (DataFrame): The flattened NGSI-LD data. - entity_type (str): The NGSI-LD entity type for quality annotations. - assessed_attrs (list of str): To annotate with quality information (if None, annotate entire instance). - type (str): The default `type` for the DataFrame rows if not already exist. - context_value (str or list): The value to assign to the `@context` column if it does not exist. + context_df (pd.DataFrame): Single-row DataFrame with 'id' and 'type'. + time_series_df (pd.DataFrame): Flattened time series DataFrame. + sep (str): Separator used in flattened column names (default: "__"). + assessed_attrs (list of str, optional): List of attributes to annotate. + If None, annotate the context (instance-level). Returns: - Pandas DataFrame with additional quality annotation columns. + Tuple[pd.DataFrame, pd.DataFrame]: (updated context_df, updated time_series_df) """ - annotated_data = data.copy() - new_columns = {} # Dictionary to store new columns + # Copy inputs to avoid mutation + context_df = context_df.copy() + time_series_df = time_series_df.copy() - # Ensure the 'type' column exists; if not, create it - if "type" not in annotated_data.columns: - new_columns["type"] = type - - # Ensure the 'id' column exists; if not, create it - if "id" not in annotated_data.columns: - new_columns["id"] = annotated_data.apply( - lambda row: f"urn:ngsi-ld:{row['type']}:{row.name}", axis=1 - ) - - # Handle @context column (optional) - if context_value is not None: # Only add @context if context_value is provided - if "@context" not in annotated_data.columns: - if isinstance(context_value, list): - # Apply the list across all rows - new_columns["@context"] = [context_value] * len(annotated_data) - elif isinstance(context_value, str): - # Apply the string across all rows - new_columns["@context"] = context_value + entity_id = context_df.loc[0, "id"] + entity_type = context_df.loc[0, "type"] if assessed_attrs is None: - # Annotate the entire instance (data point) - new_columns["hasQuality.type"] = "Relationship" - new_columns["hasQuality.object"] = annotated_data.apply( - lambda row: f"urn:ngsi-ld:DataQualityAssessment:{entity_type}:{row['id']}", - axis=1, + # Instance-level annotation → attach to context + context_df[f"hasQuality{sep}type"] = "Relationship" + context_df[f"hasQuality{sep}object"] = ( + f"urn:ngsi-ld:DataQualityAssessment:{entity_type}:{entity_id}" ) else: - # Annotate specific attributes + # Attribute-level annotation → apply per-attribute, per-row for attr in assessed_attrs: - # Identify columns that start with the attribute name - matching_columns = [col for col in data.columns if col.startswith(attr)] - if not matching_columns: - raise ValueError(f"Attribute '{attr}' not found in DataFrame columns.") + attr_cols = [ + col for col in time_series_df.columns if col.startswith(f"{attr}{sep}") + ] + if not attr_cols: + raise ValueError(f"Attribute '{attr}' not found in DataFrame.") + + rows_to_annotate = time_series_df[attr_cols].notna().any(axis=1) - # Add quality annotation for each matching attribute column - for col in matching_columns: - base_attr = col.split(".")[0] # Extract the base attribute name - quality_type_col = f"{base_attr}.hasQuality.type" - quality_object_col = f"{base_attr}.hasQuality.object" + quality_type_col = f"{attr}{sep}hasQuality{sep}type" + quality_obj_col = f"{attr}{sep}hasQuality{sep}object" - # Collect new columns in the dictionary - new_columns[quality_type_col] = "Relationship" - new_columns[quality_object_col] = annotated_data.apply( - lambda row: f"urn:ngsi-ld:DataQualityAssessment:{entity_type}:{row['id']}:{base_attr}".split( - "[" - )[ - 0 - ], - axis=1, - ) + # Initialize empty columns with None + time_series_df[quality_type_col] = None + time_series_df[quality_obj_col] = None - # Update DataFrame in one go using `pd.concat` to avoid fragmentation - annotated_data = pd.concat([annotated_data, pd.DataFrame(new_columns)], axis=1) + # Apply values only to relevant rows + time_series_df.loc[rows_to_annotate, quality_type_col] = "Relationship" + time_series_df.loc[rows_to_annotate, quality_obj_col] = ( + f"urn:ngsi-ld:DataQualityAssessment:{entity_type}:{entity_id}:{attr}" + ) - return annotated_data + return context_df, time_series_df From 6c94d20076784c94739bd854b2d7106774f2507d Mon Sep 17 00:00:00 2001 From: shahin Date: Fri, 1 Aug 2025 01:12:56 +0200 Subject: [PATCH 3/7] update data formatter component for handling time-series data --- .../utils/data_formatter.py | 127 ++++++++++-------- 1 file changed, 68 insertions(+), 59 deletions(-) diff --git a/InteroperabilityEnabler/utils/data_formatter.py b/InteroperabilityEnabler/utils/data_formatter.py index ceef96f..4e5deba 100644 --- a/InteroperabilityEnabler/utils/data_formatter.py +++ b/InteroperabilityEnabler/utils/data_formatter.py @@ -10,91 +10,100 @@ import json import pandas as pd -from io import StringIO -def data_to_dataframe(data): +def data_formatter(data, sep="__"): """ - Convert data from file path or raw JSON/JSON-LD into a flattened pandas DataFrame. + Formats input data (in JSON) into a structured DataFrame via the data_to_dataframe function. + The function accepts either a file path to a JSON file, a raw JSON-like string, + or a Python object (dictionary or list). It processes the input and leverages the provided separator + to convert the data into a DataFrame format. + + If the input is not in a supported format or type, an error is raised or caught. Args: - data (str | dict | list): Path to a data file or a JSON/JSON-LD object. + data: The input data to be formatted. Can be a file path (ending in .json), a + JSON-like string, or a Python object like a dictionary or list. + sep: The separator string used in formatting. Defaults to "__". Returns: - pd.DataFrame: Flattened data as a DataFrame. + A DataFrame-like object containing the structured representation of the input data, + or a tuple of `None, None` if processing fails. """ - df = None try: if isinstance(data, str): - # Handle file path - if data.endswith(".xls") or data.endswith(".xlsx"): - df = pd.read_excel(data) - elif data.endswith(".csv") : - df = pd.read_csv(data) - elif data.endswith(".json") or data.endswith(".jsonld"): + if data.endswith(".json"): with open(data, "r", encoding="utf-8") as file: - json_data = json.load(file) - entities = json_data if isinstance(json_data, list) else json_data.get("@graph", [json_data]) - df = pd.DataFrame([flatten_dict(e) for e in entities]) - df.reset_index(drop=True, inplace=True) + raw_data = json.load(file) + return data_to_dataframe(raw_data, sep=sep) else: - # Check if it's raw CSV content (contains commas and newlines) - if '\n' in data and (',' in data or ';' in data): - try: - df = pd.read_csv(StringIO(data)) - except pd.errors.ParserError: - df = pd.read_csv(StringIO(data), sep=';') - else: - raise ValueError("Unsupported file format or content. Must be .xls, .xlsx, .csv, .json, .jsonld, or raw CSV content") + raise ValueError( + "Unsupported file format or content. Must be .json or raw JSON-like content" + ) elif isinstance(data, (dict, list)): - # Handle raw JSON or JSON-LD object directly - entities = data if isinstance(data, list) else data.get("@graph", [data]) - df = pd.DataFrame([flatten_dict(e) for e in entities]) - df.reset_index(drop=True, inplace=True) + return data_to_dataframe(data, sep=sep) else: - raise ValueError("Unsupported input type. Must be file path or JSON object.") + raise ValueError( + "Unsupported input type. Must be a file path or JSON object." + ) except Exception as e: print(f"Error processing data: {e}") - return df + return None, None -def flatten_dict(d, parent_key="", sep=".", preserve_keys=None): +def data_to_dataframe(raw_data, sep): """ - Recursively flattens a nested dictionary into a flat dictionary. + Converts time-series data (in JSON) into two structured DataFrames: a context DataFrame + and a time series DataFrame. + The context DataFrame contains the id and type of the entity. It is a single-row DataFrame. + The time series DataFrame contains flattened and chronologically sorted time-based data (data points). Args: - d (dict): The dictionary to flatten. - parent_key (str): Prefix for keys during recursion. - sep (str): Separator used for key hierarchy. - preserve_keys (list): Keys whose values should not be flattened. + raw_data: Dict containing time-series data. + sep: String separator used to create flat column names by combining keys + and attributes from the hierarchical structure. Returns: - dict: A flattened dictionary. + Tuple[pd.DataFrame, pd.DataFrame]: A tuple containing two pandas DataFrames: + - A context DataFrame. + - A time series DataFrame where each row corresponds to a single observed + timestamp, flattened with attributes prefixed with their associated key. """ - if preserve_keys is None: - preserve_keys = ["coordinates", "@context"] + # Build context DataFrame + context_keys = ["id", "type"] + context = {k: raw_data[k] for k in context_keys} + context_df = pd.DataFrame([context]) - items = [] - for k, v in d.items(): - new_key = f"{parent_key}{sep}{k}" if parent_key else k + # Build dynamic flat rows + rows = [] - if isinstance(v, dict): - if k in preserve_keys: - items.append((new_key, v)) - else: - items.extend(flatten_dict(v, new_key, sep=sep, preserve_keys=preserve_keys).items()) + for key, val_list in raw_data.items(): + if ( + isinstance(val_list, list) + and val_list + and isinstance(val_list[0], dict) + and "observedAt" in val_list[0] + ): + for entry in val_list: + timestamp = entry["observedAt"] + row = {"observedAt": timestamp} + for attr_key, attr_val in entry.items(): + if attr_key == "observedAt": + continue + row[f"{key}{sep}{attr_key}"] = attr_val + rows.append(row) - elif isinstance(v, list): - if k in preserve_keys: - items.append((new_key, v)) - else: - for i, item in enumerate(v): - if isinstance(item, dict): - items.extend(flatten_dict(item, f"{new_key}[{i}]", sep=sep, preserve_keys=preserve_keys).items()) - else: - items.append((f"{new_key}[{i}]", item)) + # Combine and reshape + time_series_df = pd.DataFrame(rows) - else: - items.append((new_key, v)) + # Handle potential duplicates by grouping + time_series_df = time_series_df.groupby("observedAt").first().reset_index() + + # Sort chronologically + time_series_df["observedAt"] = pd.to_datetime(time_series_df["observedAt"]) + time_series_df = time_series_df.sort_values("observedAt").reset_index(drop=True) + + # Convert to UNIX timestamp + time_series_df["observedAt"] = time_series_df["observedAt"].astype(int) // 10**9 - return dict(items) + return context_df, time_series_df \ No newline at end of file From e8f3ae68c0080ed64b9efdbb698f52d5c0df09da Mon Sep 17 00:00:00 2001 From: shahin Date: Fri, 1 Aug 2025 01:19:30 +0200 Subject: [PATCH 4/7] update the data mapper component --- InteroperabilityEnabler/utils/data_mapper.py | 215 +++++++------------ 1 file changed, 74 insertions(+), 141 deletions(-) diff --git a/InteroperabilityEnabler/utils/data_mapper.py b/InteroperabilityEnabler/utils/data_mapper.py index 46fcf24..77a4340 100644 --- a/InteroperabilityEnabler/utils/data_mapper.py +++ b/InteroperabilityEnabler/utils/data_mapper.py @@ -6,159 +6,92 @@ Author: Shahin ABDOUL SOUKOUR - Inria Maintainer: Shahin ABDOUL SOUKOUR - Inria """ - -from collections import OrderedDict from datetime import datetime +import pandas as pd -def data_conversion(df, entity_type=None, context_value=None): +def data_mapper( + context_df: pd.DataFrame, time_series_df: pd.DataFrame, sep="__" +) -> dict: """ - Convert a DataFrame into NGSI-LD format. + Maps data from context and time series DataFrames into a structured dictionary format, + while organizing instance-level quality annotations and grouping attributes from time + series data. The function ensures proper nesting of "hasQuality" fields, utilizes a + custom separator for splitting field names, and preserves timestamp data in ISO 8601 format. Args: - df (DataFrame): The input DataFrame (from CSV, XLS/XLSX or flattened NGSI-LD JSON). - entity_type (str): The default entity type to use for CSV data. - context_value (str or list, optional): The default @context value to use if missing or null. + context_df (pd.DataFrame): The context DataFrame, expected to contain a single row + representing context-level metadata. + time_series_df (pd.DataFrame): The time series DataFrame containing multiple rows, + with each row representing attribute values observed over time along with + a timestamp field named "observedAt". + sep (str): Separator string used to delineate composite field names in the time + series DataFrame. Default is "__". Returns: - A NGSI-LD data. + dict: A dictionary containing context-level attributes along with grouped and + timestamped attribute data from the time series DataFrame. """ - timestamp_columns = [ - "UnixTime.value", - "UnixTime", - "observedAt", - "createdAt", - "modifiedAt", - "deletedAt", - "start", - "end", - "startAt", - "endAt", - "dateObserved.value", - "dateObserved", - "dateCreated", - "dateModified", - "endTimeAt", - "expriresAt", - "lastFailure", - "lastNotification", - "lastSuccess", - "notifiedAt", - "timeAt", - "testedAt", - ] - - ngsi_ld_entities = [] - - # Iterate over each row in the DataFrame - for index, row in df.iterrows(): - # Handle 'id' - entity_id = row.get("id") - if not entity_id or str(entity_id).lower() == "null": - entity_id = f"urn:ngsi-ld:{entity_type}:{index}" - - # Handle 'type' - entity_type_value = row.get("type") - if not entity_type_value or str(entity_type_value).lower() == "null": - entity_type_value = entity_type - - # Initialize the entity - entity = OrderedDict() - entity["id"] = entity_id - entity["type"] = entity_type_value - - # Handle '@context' - existing_context = row.get("@context") - if existing_context and str(existing_context).lower() != "null": - context_to_add = existing_context # Preserve the existing @context - elif context_value is not None: - context_to_add = ( - context_value if isinstance(context_value, list) else context_value - ) + # Extract context as dict + context = context_df.iloc[0].to_dict() + + # Handle instance-level hasQuality annotation from context + instance_type_key = f"hasQuality{sep}type" + instance_object_key = f"hasQuality{sep}object" + if instance_type_key in context and instance_object_key in context: + if pd.notna(context[instance_type_key]) and pd.notna( + context[instance_object_key] + ): + context["hasQuality"] = { + "type": context.pop(instance_type_key), + "object": context.pop(instance_object_key), + } else: - context_to_add = None # No context specified + context.pop(instance_type_key, None) + context.pop(instance_object_key, None) + + # Prepare time series attribute grouping + attribute_groups = {} + + for _, row in time_series_df.iterrows(): + ts = row["observedAt"] + ts_iso = datetime.utcfromtimestamp(int(ts)).strftime("%Y-%m-%dT%H:%M:%SZ") + + attr_temp = {} - # Process each column - for column in df.columns: - # Skip 'id', 'type', and '@context' since they're already handled - if column in ["id", "type", "@context"]: + for col, val in row.items(): + if col == "observedAt": continue - value = row[column] - - # Check if the column is one of the timestamp columns - if column in timestamp_columns and isinstance(value, (int, float)): - try: - # Convert timestamp to datetime format - value = datetime.utcfromtimestamp(float(value)).strftime( - "%Y-%m-%dT%H:%M:%SZ" - ) - except (ValueError, TypeError): - pass # If conversion fails, keep the original value - - if "." in column: - # Handle nested attributes (assume NGSI-LD JSON format) - parts = column.split(".") - if parts[0] not in entity: - entity[parts[0]] = {} - current_level = entity[parts[0]] - for part in parts[1:-1]: - if part not in current_level: - current_level[part] = {} - current_level = current_level[part] - current_level[parts[-1]] = value - else: - # Treat as Property for CSV-originating data - entity[column] = { - "type": "Property", ### <-- - "value": value, - } - - # Check for "type": "null" and replace with "type": "Property" - for key, attribute in entity.items(): - if isinstance(attribute, dict) and attribute.get("type") == "null": - attribute["type"] = "Property" - - # Add @context at the end if it exists - if context_to_add is not None: - entity["@context"] = context_to_add - - # Append the constructed entity - ngsi_ld_entities.append(entity) - - return ngsi_ld_entities - - -def restore_ngsi_ld_structure(ngsi_ld_data): - """ - Restore the NGSI-LD structure. + if sep in col: + attr, field = col.split(sep, 1) + if attr not in attr_temp: + attr_temp[attr] = {} + attr_temp[attr][field] = val - Args: - data: The NGSI-LD data to be processed. It can be a dictionary or a list of dictionaries. + for attr, data in attr_temp.items(): + data["observedAt"] = ts_iso - Returns: - A NGSI-LD data restored. - """ - # Handling list recursively - # Ensure that nested lists (if any) are processed correctly - if isinstance( - ngsi_ld_data, list - ): # If data is a list, we recursively process each item in the list - return [restore_ngsi_ld_structure(item) for item in ngsi_ld_data] - - restored_data = {} - for key, value in ngsi_ld_data.items(): - if "[" in key: # If a key contains [number] (e.g., "availableBikeNumber[0]") - base_key = key.split("[")[ - 0 - ] # Extract base_key (e.g., "availableBikeNumber") - restored_data.setdefault(base_key, []).append( - value - ) # To initialize a list if it does not exist and append the value to the list - else: - # If value is another dictionary, recursively process it - # Otherwise, store the value as is - restored_data[key] = ( - restore_ngsi_ld_structure(value) if isinstance(value, dict) else value - ) - return restored_data + # Detect and nest hasQuality fields if present + hq_type_key = "hasQuality" + sep + "type" + hq_obj_key = "hasQuality" + sep + "object" + + hq_type = data.pop(hq_type_key, None) + hq_obj = data.pop(hq_obj_key, None) + + # if pd.notna(hq_type) and pd.notna(hq_obj): + # data["hasQuality"] = {"type": hq_type, "object": hq_obj} + + # Always add hasQuality key, with None if missing + data["hasQuality"] = { + "type": None if pd.isna(hq_type) else hq_type, + "object": None if pd.isna(hq_obj) else hq_obj, + } + + # Store observations per attribute + if attr not in attribute_groups: + attribute_groups[attr] = [] + attribute_groups[attr].append(data) + + # Merge and return + return {**context, **attribute_groups} From 752e5616f756e0033f72dcddf2749fd9242e1a4c Mon Sep 17 00:00:00 2001 From: shahin Date: Fri, 1 Aug 2025 01:24:08 +0200 Subject: [PATCH 5/7] correct the exmaple_json.json path for testing --- tests/test_basic.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/test_basic.py b/tests/test_basic.py index 9f9577f..bad203a 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -1,7 +1,6 @@ import pandas as pd -import numpy as np import pytest -from InteroperabilityEnabler.utils.data_formatter import data_to_dataframe +from InteroperabilityEnabler.utils.data_formatter import data_formatter from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df from io import StringIO from InteroperabilityEnabler.utils.merge_data import merge_predicted_data @@ -11,7 +10,7 @@ import json -FILE_PATH_JSON = "example_json.json" +FILE_PATH_JSON = "tests/example_json.json" MOCK_CSV = """ @@ -44,7 +43,7 @@ def test_data_formatter(file_path): json_data = json.load(f) # Run the formatter - context_df, time_series_df = data_to_dataframe(json_data, sep="__") + context_df, time_series_df = data_formatter(json_data, sep="__") # Assertions (use `assert`, not `assertEqual`) assert context_df.iloc[0]["id"] == "urn:sedimark:station:1" @@ -62,7 +61,7 @@ def test_instance_level_annotation(file_path): json_data = json.load(f) # Convert to DataFrames - context_df, time_series_df = data_to_dataframe(json_data, sep="__") + context_df, time_series_df = data_formatter(json_data, sep="__") # Apply instance-level annotation updated_context_df, updated_time_series_df = add_quality_annotations_to_df( @@ -90,7 +89,7 @@ def test_attribute_level_annotation(file_path): json_data = json.load(f) # Convert to DataFrames - context_df, time_series_df = data_to_dataframe(json_data, sep="__") + context_df, time_series_df = data_formatter(json_data, sep="__") # Apply attribute-level annotation on 'pm10' updated_context_df, updated_time_series_df = add_quality_annotations_to_df( @@ -129,7 +128,7 @@ def test_data_mapper(file_path): json_data = json.load(f) # Format data - context_df, time_series_df = data_to_dataframe(json_data, sep="__") + context_df, time_series_df = data_formatter(json_data, sep="__") # Apply attribute-level annotation on 'no2' context_df, time_series_df = add_quality_annotations_to_df( From 6ff4a33c3968d29b4531700a442b74f79804224a Mon Sep 17 00:00:00 2001 From: shahin Date: Fri, 1 Aug 2025 01:32:07 +0200 Subject: [PATCH 6/7] add some comments --- tests/test_basic.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/test_basic.py b/tests/test_basic.py index bad203a..339816c 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -38,6 +38,10 @@ @pytest.mark.parametrize("file_path", [FILE_PATH_JSON]) def test_data_formatter(file_path): + """ + Data Formatter component tests. + Converts JSON data to DataFrames. + """ # Load the JSON file from disk with open(file_path, "r", encoding="utf-8") as f: json_data = json.load(f) @@ -56,6 +60,10 @@ def test_data_formatter(file_path): @pytest.mark.parametrize("file_path", [FILE_PATH_JSON]) def test_instance_level_annotation(file_path): + """ + Data quality annotation component tests. + Instance-level annotations. + """ # Load JSON data with open(file_path, "r", encoding="utf-8") as f: json_data = json.load(f) @@ -84,6 +92,10 @@ def test_instance_level_annotation(file_path): @pytest.mark.parametrize("file_path", [FILE_PATH_JSON]) def test_attribute_level_annotation(file_path): + """ + Data quality annotation component tests. + Attribut-level annotation. + """ # Load JSON data from file with open(file_path, "r", encoding="utf-8") as f: json_data = json.load(f) @@ -123,6 +135,10 @@ def test_attribute_level_annotation(file_path): @pytest.mark.parametrize("file_path", [FILE_PATH_JSON]) def test_data_mapper(file_path): + """ + Data Mapper component tests. + Converts structured data to JSON format. + """ # Load JSON data with open(file_path, "r", encoding="utf-8") as f: json_data = json.load(f) From 35b5f42c29feaf5740486056bb4cb568bf71c86c Mon Sep 17 00:00:00 2001 From: shahin Date: Fri, 1 Aug 2025 01:55:40 +0200 Subject: [PATCH 7/7] update readme.md files --- README.md | 56 +++++++++++++++------------------------------- README_package.md | 57 +++++++++++++++-------------------------------- 2 files changed, 36 insertions(+), 77 deletions(-) diff --git a/README.md b/README.md index e64ccf8..4fa63c7 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,9 @@ Interoperability Enabler (IE) component is designed to facilitate seamless integ ## Key Feature -- Data Formatter - Convert data from various formats into the SEDIMARK internal processing format (pandas DataFrames) +- Data Formatter - Convert JSON data (time-series data) into the SEDIMARK internal processing format (pandas DataFrames) - Data Quality Annotations - Enable adding any kind of quality annotations to data inside pandas DataFrames -- Data Mapper – Convert data from pandas DataFrames into NGSI-LD json +- Data Mapper – Convert data from pandas DataFrames into JSON - Data Extractor – Extract relevant data from a pandas DataFrame - Metadata Restorer – Restore metadata to a pandas DataFrame - Data Merger – Merge two DataFrames by matching column names @@ -58,14 +58,12 @@ InteroperabilityEnabler #### Data Formatter (to convert the input data into a pandas DataFrame) ```python -from InteroperabilityEnabler.utils.data_formatter import data_to_dataframe +from InteroperabilityEnabler.utils.data_formatter import data_formatter -FILE_PATH="sample.jsonld" -df = data_to_dataframe(FILE_PATH) +FILE_PATH="sample.json" +context_df, time_series_df = data_formatter(FILE_PATH) ``` -It recursively flattens dictionaries while preserving key hierarchies, supporting nested structures and ensuring efficient processing and interoperability. - #### Data Quality Annotations (to enrich pandas DataFrames by adding quality annotations) @@ -73,45 +71,27 @@ Instance-level annotations: ```python from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df -entity_type_annotation = "entity_type_value" # entity type for quality annotations -annotated_df = add_quality_annotations_to_df( - df, - entity_type = entity_type_annotation, - assessed_attrs = None, - # type = "new_type", # If there is no type in the input file, a new one can be created - # context_value = [link1, link2] # If there is no @context in the input file, a new one can be created -) -``` - -Attribut-level annotation: -```python -from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df - -entity_type_annotation = "entity_type_value" # entity type for quality annotations -assessed_attrs = ["attribut_name"] # Base attribute name (metadata) -annotated_df = add_quality_annotations_to_df( - df, entity_type = entity_type_annotation, assessed_attrs = assessed_attrs +context_df, annotated_df = add_quality_annotations_to_df( + context_df, time_series_df, assessed_attrs=None ) ``` -Granular-level annotation: +Attribute-level annotation: ```python from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df -entity_type_annotation = "entity_type_value" # entity type for quality annotations -assessed_attrs = ["currentTripCount[0]"] # Base attribute name (metadata) - with the indice -annotated_df = add_quality_annotations_to_df( - df, entity_type = entity_type_annotation, assessed_attrs = assessed_attrs +assessed_attrs = ["no"] # Base attribute name +context_df, annotated_df = add_quality_annotations_to_df( + context_df, time_series_df, assessed_attrs=assessed_attrs ) ``` -#### Data Mapper (to convert the DataFrame into NGSI-LD json format) +#### Data Mapper (to convert the DataFrame into JSON format) ```python -from InteroperabilityEnabler.utils.data_mapper import data_conversion, restore_ngsi_ld_structure +from InteroperabilityEnabler.utils.data_mapper import data_mapper -data = data_conversion(annotated_df) -data_restored = restore_ngsi_ld_structure(data) # to restore the original NGSI-LD structure +data_json = data_mapper(context_df, annotated_df) ``` #### Data Extractor (to extract and return specific columns from a pandas DataFrame) @@ -120,9 +100,9 @@ data_restored = restore_ngsi_ld_structure(data) # to restore the original NGSI-L from InteroperabilityEnabler.utils.extract_data import extract_columns # Select columns by index -column_indices = [5, 7] +column_indices = [2, 5] -selected_df, selected_column_names = extract_columns(df, column_indices) +selected_df, selected_column_names = extract_columns(time_series_df, column_indices) print("\nSelected DataFrame:") print(selected_df) @@ -151,10 +131,10 @@ predicted_df = add_metadata_to_predictions_from_dataframe( from InteroperabilityEnabler.utils.merge_data import merge_predicted_data # To combine the original input data with the corresponding prediction results from an AI model -merged_df = merge_predicted_data(df, predicted_df) +merged_df = merge_predicted_data(time_series_df, predicted_df) ``` ## Acknowledgement -This software has been developed by the [Inria](https://www.inria.fr/fr) under the [SEDIMARK(SEcure Decentralised Intelligent Data MARKetplace)](https://sedimark.eu/) project. +This software has been developed by [Inria](https://www.inria.fr/fr) under the [SEDIMARK(SEcure Decentralised Intelligent Data MARKetplace)](https://sedimark.eu/) project. SEDIMARK is funded by the European Union under the Horizon Europe framework programme [grant no. 101070074]. diff --git a/README_package.md b/README_package.md index e470cfd..1e513a8 100644 --- a/README_package.md +++ b/README_package.md @@ -5,9 +5,9 @@ Interoperability Enabler (IE) component is designed to facilitate seamless integ ## Key Feature -- Data Formatter - Convert data from various formats into the SEDIMARK internal processing format (pandas DataFrames) +- Data Formatter - Convert JSON data (time-series data) into the SEDIMARK internal processing format (pandas DataFrames) - Data Quality Annotations - Enable adding any kind of quality annotations to data inside pandas DataFrames -- Data Mapper – Convert data from pandas DataFrames into NGSI-LD json +- Data Mapper – Convert data from pandas DataFrames into JSON - Data Extractor – Extract relevant data from a pandas DataFrame - Metadata Restorer – Restore metadata to a pandas DataFrame - Data Merger – Merge two DataFrames by matching column names @@ -27,60 +27,39 @@ pip install InteroperabilityEnabler #### Data Formatter (to convert the input data into a pandas DataFrame) ```python -from InteroperabilityEnabler.utils.data_formatter import data_to_dataframe +from InteroperabilityEnabler.utils.data_formatter import data_formatter -FILE_PATH="sample.jsonld" -df = data_to_dataframe(FILE_PATH) +FILE_PATH="sample.json" +context_df, time_series_df = data_formatter(FILE_PATH) ``` -It recursively flattens dictionaries while preserving key hierarchies, supporting nested structures and ensuring efficient processing and interoperability. - - #### Data Quality Annotations (to enrich pandas DataFrames by adding quality annotations) Instance-level annotations: ```python from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df -entity_type_annotation = "entity_type_value" # entity type for quality annotations -annotated_df = add_quality_annotations_to_df( - df, - entity_type = entity_type_annotation, - assessed_attrs = None, - # type = "new_type", # If there is no type in the input file, a new one can be created - # context_value = [link1, link2] # If there is no @context in the input file, a new one can be created -) -``` - -Attribut-level annotation: -```python -from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df - -entity_type_annotation = "entity_type_value" # entity type for quality annotations -assessed_attrs = ["attribut_name"] # Base attribute name (metadata) -annotated_df = add_quality_annotations_to_df( - df, entity_type = entity_type_annotation, assessed_attrs = assessed_attrs +context_df, annotated_df = add_quality_annotations_to_df( + context_df, time_series_df, assessed_attrs=None ) ``` -Granular-level annotation: +Attribute-level annotation: ```python from InteroperabilityEnabler.utils.annotation_dataset import add_quality_annotations_to_df -entity_type_annotation = "entity_type_value" # entity type for quality annotations -assessed_attrs = ["currentTripCount[0]"] # Base attribute name (metadata) - with the indice -annotated_df = add_quality_annotations_to_df( - df, entity_type = entity_type_annotation, assessed_attrs = assessed_attrs +assessed_attrs = ["no"] # Base attribute name +context_df, annotated_df = add_quality_annotations_to_df( + context_df, time_series_df, assessed_attrs=assessed_attrs ) ``` -#### Data Mapper (to convert the DataFrame into NGSI-LD json format) +#### Data Mapper (to convert the DataFrame into JSON format) ```python -from InteroperabilityEnabler.utils.data_mapper import data_conversion, restore_ngsi_ld_structure +from InteroperabilityEnabler.utils.data_mapper import data_mapper -data = data_conversion(annotated_df) -data_restored = restore_ngsi_ld_structure(data) # to restore the original NGSI-LD structure +data_json = data_mapper(context_df, annotated_df) ``` #### Data Extractor (to extract and return specific columns from a pandas DataFrame) @@ -89,9 +68,9 @@ data_restored = restore_ngsi_ld_structure(data) # to restore the original NGSI-L from InteroperabilityEnabler.utils.extract_data import extract_columns # Select columns by index -column_indices = [5, 7] +column_indices = [2, 5] -selected_df, selected_column_names = extract_columns(df, column_indices) +selected_df, selected_column_names = extract_columns(time_series_df, column_indices) print("\nSelected DataFrame:") print(selected_df) @@ -120,10 +99,10 @@ predicted_df = add_metadata_to_predictions_from_dataframe( from InteroperabilityEnabler.utils.merge_data import merge_predicted_data # To combine the original input data with the corresponding prediction results from an AI model -merged_df = merge_predicted_data(df, predicted_df) +merged_df = merge_predicted_data(time_series_df, predicted_df) ``` ## Acknowledgement -This software has been developed by the [Inria](https://www.inria.fr/fr) under the [SEDIMARK(SEcure Decentralised Intelligent Data MARKetplace)](https://sedimark.eu/) project. +This software has been developed by [Inria](https://www.inria.fr/fr) under the [SEDIMARK(SEcure Decentralised Intelligent Data MARKetplace)](https://sedimark.eu/) project. SEDIMARK is funded by the European Union under the Horizon Europe framework programme [grant no. 101070074].