From 4c2520bd1ee64ac0d8931d2e2fa86b6206394d07 Mon Sep 17 00:00:00 2001 From: LSJ Date: Tue, 30 Apr 2024 07:58:55 +0200 Subject: [PATCH 01/15] Fix to LCAbyg export algorithm * Layer nodes are now exported only once * Comment property on exported nodes now defaults to empty string * Conversion or mass_factor is now exported correctly * Stages now contains the specific type of stage in its name --- src/logic/export/lcabyg/nodes.py | 10 +++++----- src/logic/export/to_lcabyg.py | 22 +++++++++++++++------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/logic/export/lcabyg/nodes.py b/src/logic/export/lcabyg/nodes.py index 64f2323..11b5414 100644 --- a/src/logic/export/lcabyg/nodes.py +++ b/src/logic/export/lcabyg/nodes.py @@ -38,7 +38,7 @@ class ClassNode(Node): def __init__(self, entity: SchemaCategory | SchemaElement): self.id = entity.id self.name = getattr(entity, "name", None) or entity.type_code_element.name - self.comment = entity.description + self.comment = entity.description or "" self.unit = self._get_unit(entity) self.element_category_id = self._resolve_category_name(entity) super().__init__() @@ -79,9 +79,9 @@ def _resolve_category_name(entity: SchemaCategory | SchemaElement) -> str | None class DictNode(Node): - def __init__(self, entity: dict): + def __init__(self, entity: dict, name: str = None): self.id = entity.get("id") - self.name = entity.get("name") + self.name = name or entity.get("name") self.comment = entity.get("description", "") or "" super().__init__() @@ -186,7 +186,7 @@ def __init__(self, entity: dict, stage: str): self.valid_to = epd.get("validUntil") self.mass_factor = self._get_mass_factor(epd) self.element_category_id = "f1bca522-df0d-4b74-a5de-cdecc2763d05" - super().__init__(epd) + super().__init__(epd, f'{epd.get("name")} ({stage})') def set_indicators(self, entity: dict, stage: str): if stage == "A1to3": @@ -206,7 +206,7 @@ def set_indicators(self, entity: dict, stage: str): @staticmethod def _get_mass_factor(entity: dict) -> float: for conversion in entity.get("conversions"): - if conversion.get("to") == "kg": + if conversion.get("to").casefold() == "kg": return conversion.get("value") def as_dict(self) -> dict: diff --git a/src/logic/export/to_lcabyg.py b/src/logic/export/to_lcabyg.py index f227498..1710d3d 100644 --- a/src/logic/export/to_lcabyg.py +++ b/src/logic/export/to_lcabyg.py @@ -45,6 +45,7 @@ def aggregate_lcabyg_models( """ entity_list = [] + layer_nodes = {} for category in schema_categories: if category.elements == []: # pragma: no branch continue @@ -60,12 +61,19 @@ def aggregate_lcabyg_models( for assembly in assemblies: if assembly["id"] == element.assembly_id: for layer in assembly.get("layers", []): - layer_node = create_node(layer) - entity_list.extend([layer_node, create_edge(layer_node, element_node)]) - for phase in ["A1to3", "C3", "C4", "D"]: - phase_node = create_node((layer, phase)) - entity_list.extend( - [phase_node, create_edge(phase_node, layer_node), create_edge(phase_node)] # ]) - ) + + if layer["id"] not in layer_nodes: + layer_node = create_node(layer) + layer_nodes[layer["id"]] = layer_node + entity_list.append(layer_node) + for phase in ["A1to3", "C3", "C4", "D"]: + phase_node = create_node((layer, phase)) + entity_list.extend( + [phase_node, create_edge(phase_node, layer_node), create_edge(phase_node)] # ]) + ) + else: + layer_node = layer_nodes[layer["id"]] + + entity_list.append(create_edge(layer_node, element_node)) return entity_list From 865e5d0d9ffcaa55f67357fd21ddc0ed5a14bb65 Mon Sep 17 00:00:00 2001 From: LSJ Date: Fri, 17 May 2024 13:50:06 +0200 Subject: [PATCH 02/15] Assemblies are now exported to to lcabyg --- src/assembly/__init__.py | 0 src/assembly/models.py | 79 +++++++ src/dataclass_utils/Mixins.py | 27 +++ src/dataclass_utils/__init__.py | 0 src/logic/export/lcabyg/edges.py | 134 ------------ src/logic/export/lcabyg/graph.py | 140 +++++++++++++ src/logic/export/lcabyg/model_builder.py | 122 +++++++++++ src/logic/export/lcabyg/models.py | 37 ---- src/logic/export/lcabyg/nodes.py | 252 ----------------------- src/logic/export/lcabyg/utilities.py | 44 +++- src/logic/export/to_lcabyg.py | 63 +++--- src/logic/export/utils.py | 24 +-- 12 files changed, 445 insertions(+), 477 deletions(-) create mode 100644 src/assembly/__init__.py create mode 100644 src/assembly/models.py create mode 100644 src/dataclass_utils/Mixins.py create mode 100644 src/dataclass_utils/__init__.py delete mode 100644 src/logic/export/lcabyg/edges.py create mode 100644 src/logic/export/lcabyg/graph.py create mode 100644 src/logic/export/lcabyg/model_builder.py delete mode 100644 src/logic/export/lcabyg/models.py delete mode 100644 src/logic/export/lcabyg/nodes.py diff --git a/src/assembly/__init__.py b/src/assembly/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/assembly/models.py b/src/assembly/models.py new file mode 100644 index 0000000..d0cde97 --- /dev/null +++ b/src/assembly/models.py @@ -0,0 +1,79 @@ +from dataclasses import Field, dataclass, field +from datetime import date + +from dataclass_utils.Mixins import DataclassFromDictMixin + + +@dataclass +class AssemblyDtoBase(DataclassFromDictMixin): + id: str + name: str + + +@dataclass +class Assembly(AssemblyDtoBase): + lifetime: float + unit: str + conversion_factor: float + description: str + layers: list["Layer"] = field(default_factory=list) + + @classmethod + def _coerce_value(cls, field: Field, value: any): + if field.name == "layers": + return [Layer.from_dict(layer) for layer in value] + return super()._coerce_value(field, value) + + +@dataclass +class Layer(AssemblyDtoBase): + description: str + reference_service_life: int + transport_distance: float + transport_conversion_factor: float + # TODO: Check if this is correct. Its used to calculate amount of product used + conversion_factor: float = 0 + epd: "Epd" = None + + @classmethod + def _coerce_value(cls, field: Field, value: any): + if field.name == "epd": + return Epd.from_dict(value) + return super()._coerce_value(field, value) + + +@dataclass +class Epd(AssemblyDtoBase): + declared_unit: str + version: str + valid_until: date + published_date: date + source: str + location: str + subtype: str + reference_service_life: int + gwp: "Gwp" + comment: str = "" + conversions: list["Conversion"] = field(default_factory=list) + + @classmethod + def _coerce_value(cls, field: Field, value: any): + if field.name == "conversions": + return [Conversion.from_dict(c) for c in value] + if field.name == "gwp": + return Gwp.from_dict(value) + return super()._coerce_value(field, value) + + +@dataclass +class Conversion(DataclassFromDictMixin): + to: str + value: float + + +@dataclass +class Gwp(DataclassFromDictMixin): + a1a3: float + c3: float + c4: float + d: float diff --git a/src/dataclass_utils/Mixins.py b/src/dataclass_utils/Mixins.py new file mode 100644 index 0000000..6ee11e7 --- /dev/null +++ b/src/dataclass_utils/Mixins.py @@ -0,0 +1,27 @@ +from dataclasses import MISSING, Field, fields + + +class DataclassFromDictMixin: + @classmethod + def _get_field_value(cls, context: dict, field: Field[any]): + if field.name in context: + return context[field.name] + # default and default_factory will have the same value (MISSING) if not set. + if field.default == field.default_factory: + raise ValueError(f"Cannot find value for field {field.name} when creating {cls.__name__}") + + return field.default if field.default != MISSING else field.default_factory() + + @classmethod + def _coerce_value(cls, field: Field[any], value: any): + return value + + @classmethod + def from_dict(cls, dict: dict): + kwargs = {} + for f in fields(cls): + value = cls._get_field_value(dict, f) + value = cls._coerce_value(f, value) + kwargs[f.name] = value + + return cls(**kwargs) diff --git a/src/dataclass_utils/__init__.py b/src/dataclass_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/logic/export/lcabyg/edges.py b/src/logic/export/lcabyg/edges.py deleted file mode 100644 index 30a6255..0000000 --- a/src/logic/export/lcabyg/edges.py +++ /dev/null @@ -1,134 +0,0 @@ -import inspect -import sys -from typing import TYPE_CHECKING, Optional - -from lcacollect_config.formatting import string_uuid - -from logic.export.lcabyg.models import Entity - -if TYPE_CHECKING: - from logic.export.lcabyg.nodes import ( - ConstructionNode, - ElementNode, - Node, - ProductNode, - StageNode, - ) - - -def create_edge(child: "Node", parent: Optional["Node"] = None) -> "Edge": - classes = {key: value for key, value in inspect.getmembers(sys.modules[__name__], inspect.isclass)} - if parent is not None: - name = f"{parent.type.value}To{child.type.value}Edge" - parent_id = parent.id - edge_class = classes[name] - return edge_class(child, parent_id) - else: - # NOTE: If only one Node is passed, create a link between the node and GenDK - name = f"CategoryTo{child.type.value}Edge" - parent_id = child.element_category_id - edge_class = classes[name] - return edge_class(child, parent_id) - - -class Edge(Entity): - """LCAByg edge model.""" - - def __init__(self, child: "Node", parent_id: str): - self.child_id = child.id - self.id = string_uuid() - self.parent_id = parent_id - - def _get_meta_data(self): - raise NotImplementedError - - def as_dict(self) -> dict: - self_dict = { - "Edge": [ - {self.name: self._get_meta_data()}, - self.parent_id, - self.child_id, - ] - } - return self_dict - - -class ElementToConstructionEdge(Edge): - def __init__(self, child: "ConstructionNode", parent_id: str): - self.name = "ElementToConstruction" - self.amount = child.amount - super().__init__(child, parent_id) - - def _get_meta_data(self): - return { - "amount": self.amount or 0, - "enabled": True, - "id": self.id, - } - - -class CategoryToConstructionEdge(Edge): - def __init__(self, child: "ConstructionNode", parent_id: str): - self.name = "CategoryToConstruction" - super().__init__(child, parent_id) - - def _get_meta_data(self): - return { - "id": self.id, - "layers": [1], - } - - -class CategoryToElementEdge(Edge): - def __init__(self, child: "ElementNode", parent_id: str): - self.name = "CategoryToElement" - super().__init__(child, parent_id) - - def _get_meta_data(self): - return { - "enabled": True, - "id": self.id, - } - - -class ConstructionToProductEdge(Edge): - def __init__(self, child: "ProductNode", parent_id: str): - self.name = "ConstructionToProduct" - self.amount = child.amount - self.unit = child.unit - self.life_span = child.life_span - super().__init__(child, parent_id) - - def _get_meta_data(self): - return { - "id": self.id, - "amount": self.amount or 0, - "unit": self.unit, - "lifespan": int(self.life_span), - "demolition": False, - "delayed_start": 0, - "enabled": True, - "expected_scenarios": [], - } - - -class ProductToStageEdge(Edge): - def __init__(self, child: "StageNode", parent_id: str): - self.name = "ProductToStage" - super().__init__(child, parent_id) - - def _get_meta_data(self): - return { - "id": self.id, - "excluded_scenarios": [], - "enabled": True, - } - - -class CategoryToStageEdge(Edge): - def __init__(self, child: "StageNode", parent_id: str): - self.name = "CategoryToStage" - super().__init__(child, parent_id) - - def _get_meta_data(self): - return self.id diff --git a/src/logic/export/lcabyg/graph.py b/src/logic/export/lcabyg/graph.py new file mode 100644 index 0000000..e08c10c --- /dev/null +++ b/src/logic/export/lcabyg/graph.py @@ -0,0 +1,140 @@ +import uuid +from dataclasses import dataclass, field + +from logic.export.lcabyg.utilities import Stages, Units + + +def _generate_id(): + return str(uuid.uuid4()) + + +def _get_name_dict(name: str): + return { + "Danish": name, + "English": name, + "German": name, + } + + +@dataclass +class LcaBygGraphEntity: + def as_dict(self) -> dict: + return {} + + +@dataclass +class Edge(LcaBygGraphEntity): + id: str = field(init=False, default_factory=_generate_id) + type: str + parent: str + child: str + metadata: dict = None + + def as_dict(self): + return {"Edge": [{self.type: {"id": self.id, **(self.metadata or {})}}, self.parent, self.child]} + + +@dataclass +class CategoryToStageEdge(LcaBygGraphEntity): + id: str = field(init=False, default_factory=_generate_id) + parent: str + child: str + + def as_dict(self): + return {"Edge": [{"CategoryToStage": self.id}, self.parent, self.child]} + + +@dataclass +class Node: + id: str + name: str + comment: str + locked: bool = field(init=False, default=True) + source: str = field(init=False, default="User") + + def _add_to_dict(self, dict: dict): + dict["id"] = self.id + dict["name"] = _get_name_dict(self.name) + dict["comment"] = self.comment + dict["locked"] = self.locked + dict["source"] = self.source + + def as_dict(self): + dict = {} + self._add_to_dict(dict) + return { + "Node": { + self.__class__.__name__: dict, + } + } + + +@dataclass +class Element(Node): + enabled: bool = field(init=False, default=True) + + def _add_to_dict(self, dict: dict): + super()._add_to_dict(dict) + dict["enabled"] = self.enabled + + +@dataclass +class Construction(Node): + unit: str + + def _add_to_dict(self, dict: dict): + super()._add_to_dict(dict) + dict["unit"] = Units(self.unit) + + +@dataclass +class Product(Node): + uncertainty_factor: float = field(init=False, default=1.0) + uncertainty_factor_dgnb: float = field(init=False, default=1.3) + + def _add_to_dict(self, dict: dict): + super()._add_to_dict(dict) + dict["uncertainty_factor"] = self.uncertainty_factor + dict["uncertainty_factor_dgnb"] = self.uncertainty_factor_dgnb + + +@dataclass +class Stage(Node): + stage: Stages + data_type: str + unit: str + valid_to: str + mass_factor: float + gwp: int = 0 + + def _add_to_dict(self, dict: dict): + super()._add_to_dict(dict) + dict.update( + **{ + "valid_to": self.valid_to, + "stage": self.stage, + "stage_unit": Units(self.unit), + "indicator_unit": Units(self.unit), + "stage_factor": 1.0, + "mass_factor": self.mass_factor, + "indicator_factor": 1.0, + "external_source": self.source, + "external_id": "", + "external_url": "", + "external_version": "", + "data_type": self.data_type, + "indicators": { + "GWP": self.gwp, + "ODP": 0, + "POCP": 0, + "AP": 0, + "EP": 0, + "ADPE": 0, + "ADPF": 0, + "PENR": 0, + "PER": 0, + "SENR": 0, + "SER": 0, + }, + } + ) diff --git a/src/logic/export/lcabyg/model_builder.py b/src/logic/export/lcabyg/model_builder.py new file mode 100644 index 0000000..ea1297a --- /dev/null +++ b/src/logic/export/lcabyg/model_builder.py @@ -0,0 +1,122 @@ +import uuid +from typing import Union + +from assembly.models import Assembly, Conversion, Epd, Layer +from logic.export.lcabyg.graph import ( + CategoryToStageEdge, + Construction, + Edge, + Element, + LcaBygGraphEntity, + Product, + Stage, +) +from logic.export.lcabyg.utilities import CATEGORY_RESOLVERS, EntityTypes, Stages, Units +from models.schema_category import SchemaCategory, SchemaElement + + +class LCAbygModelBuilder: + def __init__(self, category_schema: str) -> None: + self._nodes: dict = {} + self._edges: dict = {} + if category_schema not in CATEGORY_RESOLVERS: + raise ValueError(f"unsupported category schema '{category_schema}'") + self._category_resolver = CATEGORY_RESOLVERS[category_schema] + + def build(self) -> list[LcaBygGraphEntity]: + for entity in [*self._nodes.values(), *self._edges.values()]: + print(entity.__class__.__name__) + + return [*self._nodes.values(), *self._edges.values()] + + def with_element(self, entity: SchemaCategory) -> "LCAbygModelBuilder": + self._nodes[entity.id] = Element( + id=entity.id, name=entity.type_code_element.name, comment=entity.description or "" + ) + + self._add_category_edge(entity, EntityTypes.ELEMENT) + + return self + + def with_construction( + self, entity: SchemaElement | Assembly, parent: SchemaCategory | None = None + ) -> "LCAbygModelBuilder": + self._nodes[entity.id] = Construction( + id=entity.id, name=entity.name, comment=entity.description, unit=entity.unit + ) + + self._add_category_edge(entity, EntityTypes.CONSTRUCTION) + + if parent: + edge = Edge( + f"{EntityTypes.ELEMENT}To{EntityTypes.CONSTRUCTION}", + parent.id, + entity.id, + {"enabled": True, "amount": entity.quantity}, + ) + self._edges[edge.id] = edge + + return self + + def with_product(self, entity: Layer, parent: SchemaElement | Assembly): + if entity.id not in self._nodes: + self._nodes[entity.id] = Product(id=entity.id, name=entity.name, comment=entity.description or "") + + for stage in Stages: + self._add_stage(entity.epd, entity, stage) + + edge = Edge( + f"{EntityTypes.CONSTRUCTION}To{EntityTypes.PRODUCT}", + parent.id, + entity.id, + { + "amount": entity.conversion_factor, + "unit": Units(entity.epd.declared_unit), + "lifespan": int(entity.reference_service_life), + "demolition": False, + "delayed_start": 0, + "enabled": True, + "expected_scenarios": [], + }, + ) + self._edges[edge.id] = edge + + return self + + def _add_stage(self, epd: Epd, parent: Layer, stage: Stages): + id = str(uuid.uuid4()) + self._nodes[id] = Stage( + id=id, + name=f"{epd.name} ({stage})", + comment=epd.comment or "", + stage=stage, + data_type=epd.subtype, + unit=epd.declared_unit, + valid_to=epd.valid_until, + mass_factor=self._get_mass_factor(epd.conversions).value, + ) + + edge = Edge( + f"{EntityTypes.PRODUCT}To{EntityTypes.STAGE}", + parent.id, + id, + { + "excluded_scenarios": [], + "enabled": True, + }, + ) + self._edges[edge.id] = edge + + edge = CategoryToStageEdge(self._category_resolver(epd), id) + self._edges[edge.id] = edge + + _input_entities = Union[SchemaCategory, SchemaElement, Assembly, Layer, Stage] + + def _add_category_edge(self, entity: _input_entities, entity_type: EntityTypes): + parent_id = self._category_resolver(entity) + edge = Edge(f"CategoryTo{entity_type}", parent_id, entity.id) + self._edges[edge.id] = edge + + @staticmethod + def _get_mass_factor(conversions: list[Conversion]) -> Conversion: + return next(c for c in conversions if c.to.casefold() == Units.MASS.casefold()) diff --git a/src/logic/export/lcabyg/models.py b/src/logic/export/lcabyg/models.py deleted file mode 100644 index 01ff95b..0000000 --- a/src/logic/export/lcabyg/models.py +++ /dev/null @@ -1,37 +0,0 @@ -"""Module for mapping SchemaElement and SchemaCategory objects to LCAByg edges and nodes.""" -import json -import logging -from abc import ABC, abstractmethod -from enum import Enum -from typing import Optional - -from lcacollect_config.formatting import string_uuid - -from models.schema_category import SchemaCategory -from models.schema_element import SchemaElement - -from .utilities import CATEGORY_RESOLVERS, EntityTypes, Units - -logger = logging.getLogger(__name__) - - -class Entity(ABC): # pragma: no cover - """Parent class of LCAByg Node and Edge.""" - - @abstractmethod - def as_dict(self) -> dict: - """Return self in dict form.""" - pass - - def _as_json(self) -> str: - """Return self in json form.""" - return json.dumps(self.as_dict(), indent=4) - - def __str__(self): - return self._as_json() - - def __repr__(self): - try: - return f'{self.__class__.__name__}("{self.name}")' - except AttributeError as e: - return e("__repr__ was called from abstract base class.") diff --git a/src/logic/export/lcabyg/nodes.py b/src/logic/export/lcabyg/nodes.py deleted file mode 100644 index 11b5414..0000000 --- a/src/logic/export/lcabyg/nodes.py +++ /dev/null @@ -1,252 +0,0 @@ -import logging - -from lcacollect_config.formatting import string_uuid - -from logic.export.lcabyg.models import Entity -from logic.export.lcabyg.utilities import CATEGORY_RESOLVERS, EntityTypes, Units -from models.schema_category import SchemaCategory -from models.schema_element import SchemaElement - -logger = logging.getLogger(__name__) - - -def create_node(entity: SchemaCategory | SchemaElement | dict | tuple[dict, str]) -> "Node": - if isinstance(entity, SchemaElement): - return ConstructionNode(entity) - elif isinstance(entity, SchemaCategory): - return ElementNode(entity) - elif isinstance(entity, dict): - return ProductNode(entity) - elif isinstance(entity, tuple): - return StageNode(*entity) - else: - raise NotImplementedError(f"Type {type(entity)} is not implemented.") - - -class Node(Entity): - """LCAByg node model.""" - - def __init__(self): - self.locked = False - self.source = "User" - - def as_dict(self) -> dict: - raise NotImplementedError - - -class ClassNode(Node): - def __init__(self, entity: SchemaCategory | SchemaElement): - self.id = entity.id - self.name = getattr(entity, "name", None) or entity.type_code_element.name - self.comment = entity.description or "" - self.unit = self._get_unit(entity) - self.element_category_id = self._resolve_category_name(entity) - super().__init__() - - @staticmethod - def _get_unit(entity: SchemaCategory | SchemaElement | dict) -> str | None: - """Get unit from entity.""" - - valid_units = set([e.value for e in Units if e is not Units.NONE]) - if not hasattr(entity, "unit") or entity.unit.upper() == Units.NONE.value.upper(): - # Turn to default if unit not given or unit is "Pcs" (case insensitive) - return Units.NONE.value - elif entity.unit.upper() in valid_units: - # Get unit from entity if unit is valid (case insensitive) - return entity.unit.upper() - else: - raise ValueError(f"Expected unit to be one of {valid_units} but got '{entity.unit.upper()}'.") - - @staticmethod - def _resolve_category_name(entity: SchemaCategory | SchemaElement) -> str | None: - """Extract reporting schema type, resolve category names.""" - if isinstance(entity, SchemaCategory): - schema_type = entity.reporting_schema.name - elif isinstance(entity, SchemaElement): - schema_type = entity.schema_category.reporting_schema.name - else: - schema_type = "BIM7AA" - - resolver = CATEGORY_RESOLVERS.get(schema_type) - if resolver is None: - logger.error( - f"Resolver for schema type {schema_type} is undefined." - "Setting element supercategory and category to 'Other'." - ) - return "069983d0-d08b-405b-b816-d28ca9648956" - - return resolver(entity) - - -class DictNode(Node): - def __init__(self, entity: dict, name: str = None): - self.id = entity.get("id") - self.name = name or entity.get("name") - self.comment = entity.get("description", "") or "" - super().__init__() - - -class ElementNode(ClassNode): - def __init__(self, entity: SchemaCategory): - self.type = EntityTypes.ELEMENT - super().__init__(entity) - - def as_dict(self) -> dict: - return { - "Node": { - self.type.value: { - "id": self.id, - "name": { - "Danish": self.name, - "English": self.name, - "German": self.name, - }, - "active": True, - "comment": self.comment, - "enabled": True, - "source": self.source, - } - } - } - - -class ConstructionNode(ClassNode): - def __init__(self, entity: SchemaElement): - self.type = EntityTypes.CONSTRUCTION - self.amount = entity.quantity - super().__init__(entity) - - def as_dict(self) -> dict: - return { - "Node": { - self.type.value: { - "id": self.id, - "name": { - "Danish": self.name, - "English": self.name, - "German": self.name, - }, - "comment": self.comment, - "layer": 1, - "locked": True, - "source": self.source, - "unit": self.unit, - } - } - } - - -class ProductNode(DictNode): - def __init__(self, entity: dict): - self.type = EntityTypes.PRODUCT - self.life_span = entity.get("referenceServiceLife") - self.amount = entity.get("conversionFactor") - self.unit = entity.get("epd", {}).get("declaredUnit", "").upper() - super().__init__(entity) - - def as_dict(self) -> dict: - return { - "Node": { - "Product": { - "id": self.id, - "name": { - "Danish": self.name, - "English": self.name, - "German": self.name, - }, - "source": self.source, - "comment": self.comment, - "uncertainty_factor": 1.0, - "uncertainty_factor_dgnb": 1.3, - } - } - } - - -class StageNode(DictNode): - def __init__(self, entity: dict, stage: str): - epd = entity.get("epd", {}) - epd["id"] = string_uuid() - self.ser = None - self.senr = None - self.per = None - self.penr = None - self.adpe = None - self.ep = None - self.ap = None - self.pocp = None - self.odp = None - self.gwp = None - self.adpf = None - self.type = EntityTypes.STAGE - self.stage = stage - self.set_indicators(epd, stage) - self.data_type = epd.get("subtype") - self.unit = epd.get("declaredUnit", "").upper() - self.valid_to = epd.get("validUntil") - self.mass_factor = self._get_mass_factor(epd) - self.element_category_id = "f1bca522-df0d-4b74-a5de-cdecc2763d05" - super().__init__(epd, f'{epd.get("name")} ({stage})') - - def set_indicators(self, entity: dict, stage: str): - if stage == "A1to3": - stage = "a1a3" - self.gwp = entity.get("gwp", {}).get(stage.lower(), 0) or 0 - self.odp = 0 - self.pocp = 0 - self.ap = 0 - self.ep = 0 - self.adpe = 0 - self.adpf = 0 - self.penr = 0 - self.per = 0 - self.senr = 0 - self.ser = 0 - - @staticmethod - def _get_mass_factor(entity: dict) -> float: - for conversion in entity.get("conversions"): - if conversion.get("to").casefold() == "kg": - return conversion.get("value") - - def as_dict(self) -> dict: - return { - "Node": { - "Stage": { - "id": self.id, - "name": { - "Danish": self.name, - "English": self.name, - "German": self.name, - }, - "comment": self.comment, - "source": self.source, - "locked": True, - "valid_to": self.valid_to, - "stage": self.stage, - "stage_unit": self.unit, - "indicator_unit": self.unit, - "stage_factor": 1.0, - "mass_factor": self.mass_factor, - "indicator_factor": 1.0, - "external_source": self.source, - "external_id": "", - "external_url": "", - "external_version": "", - "data_type": self.data_type, - "indicators": { - "GWP": self.gwp, - "ODP": self.odp, - "POCP": self.pocp, - "AP": self.ap, - "EP": self.ep, - "ADPE": self.adpe, - "ADPF": self.adpf, - "PENR": self.penr, - "PER": self.per, - "SENR": self.senr, - "SER": self.ser, - }, - } - } - } diff --git a/src/logic/export/lcabyg/utilities.py b/src/logic/export/lcabyg/utilities.py index 1b7089f..ac319bf 100644 --- a/src/logic/export/lcabyg/utilities.py +++ b/src/logic/export/lcabyg/utilities.py @@ -1,27 +1,56 @@ """Module with utilities for translating schema to LCAByg.""" + import logging -from enum import Enum +from enum import StrEnum +from assembly.models import Epd from models.schema_category import SchemaCategory from models.schema_element import SchemaElement logger = logging.getLogger(__name__) -class EntityTypes(Enum): +class EntityTypes(StrEnum): CONSTRUCTION = "Construction" ELEMENT = "Element" STAGE = "Stage" PRODUCT = "Product" + @classmethod + def _missing_(cls, value): + return _missing_str_enum_values(cls, value) + -class Units(Enum): +class Units(StrEnum): AREA = "M2" LENGTH = "M" MASS = "KG" - NONE = "Pcs" + NONE = "PCS" VOLUME = "M3" + @classmethod + def _missing_(cls, value): + return _missing_str_enum_values(cls, value) + + +class Stages(StrEnum): + A1TO3 = "A1to3" + C3 = "C3" + C4 = "C4" + D = "D" + + @classmethod + def _missing_(cls, value): + return _missing_str_enum_values(cls, value) + + +def _missing_str_enum_values(cls, value): + casefolded = value.casefold() + for member in cls: + if member.casefold() == casefolded: + return member + return None + def _translate_bim7aa_to_gendk( entity: SchemaCategory | SchemaElement, @@ -30,15 +59,19 @@ def _translate_bim7aa_to_gendk( Falls back to 'Other' if category does not exist. """ + type_id = None if isinstance(entity, SchemaElement): type_id = entity.schema_category.type_code_element.code - else: + elif isinstance(entity, SchemaCategory): type_id = entity.type_code_element.code + elif isinstance(entity, Epd): + type_id = "epd" gend_dk_uuid = _bim7aa_to_gendk_dict.get(str(type_id), None) if gend_dk_uuid is None: gend_dk_uuid = "069983d0-d08b-405b-b816-d28ca9648956" logger.warning(f"Could not find type ID {type_id}. Defaulting to {gend_dk_uuid} (other)") + return gend_dk_uuid @@ -46,6 +79,7 @@ def _translate_bim7aa_to_gendk( """ATTENTION: The translation from BIM7AA to LCAByg is not lossless.""" _bim7aa_to_gendk_dict = { + "epd": "f1bca522-df0d-4b74-a5de-cdecc2763d05", "1": "069983d0-d08b-405b-b816-d28ca9648956", "10": "069983d0-d08b-405b-b816-d28ca9648956", "10x": "069983d0-d08b-405b-b816-d28ca9648956", diff --git a/src/logic/export/to_lcabyg.py b/src/logic/export/to_lcabyg.py index 1710d3d..e119955 100644 --- a/src/logic/export/to_lcabyg.py +++ b/src/logic/export/to_lcabyg.py @@ -2,19 +2,18 @@ from sqlmodel import select import models.reporting_schema as models_schema -import models.reporting_schema as models_reporting import models.schema_category as models_category import models.schema_element as models_element +from assembly.models import Assembly +from logic.export.lcabyg.graph import LcaBygGraphEntity +from logic.export.lcabyg.model_builder import LCAbygModelBuilder -from .lcabyg.edges import create_edge -from .lcabyg.models import Entity -from .lcabyg.nodes import create_node -from .utils import query_assemblies_for_export, query_project_for_export +from .utils import query_assemblies_for_export async def query_for_lca_byg_export( reporting_schema_id: str, session, token: str -) -> tuple[list[models_category.SchemaCategory], list[dict] | None]: +) -> tuple[list[models_category.SchemaCategory], list[Assembly] | None]: """Query the database for SchemaCategories and the required attributes for LCAByg export.""" query = ( @@ -31,49 +30,43 @@ async def query_for_lca_byg_export( schema_categories = (await session.exec(query)).all() reporting_schema = schema_categories[0].reporting_schema assemblies = await query_assemblies_for_export(reporting_schema.project_id, token) + assemblies = [Assembly.from_dict(a) for a in (assemblies or [])] return schema_categories, assemblies def aggregate_lcabyg_models( schema_categories: list[models_schema.SchemaCategory], - assemblies: list[dict], -) -> list[Entity]: + assemblies: list[Assembly], +) -> list[LcaBygGraphEntity]: """ Return list of Edges and Nodes from schema. Edges from GenDK to LCAByg Elements and Constructions are automatically inferred from the database. """ - entity_list = [] - layer_nodes = {} + builder = LCAbygModelBuilder("BIM7AA") + + assemblyToElements: dict[str, list] = {} + for category in schema_categories: - if category.elements == []: # pragma: no branch + if category.elements == []: continue - category_node = create_node(category) - entity_list.extend([category_node, create_edge(category_node)]) + builder.with_element(category) for element in category.elements: - element_node = create_node(element) - - entity_list.extend([element_node, create_edge(element_node, category_node), create_edge(element_node)]) + builder.with_construction(element, category) if element.assembly_id: - for assembly in assemblies: - if assembly["id"] == element.assembly_id: - for layer in assembly.get("layers", []): - - if layer["id"] not in layer_nodes: - layer_node = create_node(layer) - layer_nodes[layer["id"]] = layer_node - entity_list.append(layer_node) - for phase in ["A1to3", "C3", "C4", "D"]: - phase_node = create_node((layer, phase)) - entity_list.extend( - [phase_node, create_edge(phase_node, layer_node), create_edge(phase_node)] # ]) - ) - else: - layer_node = layer_nodes[layer["id"]] - - entity_list.append(create_edge(layer_node, element_node)) - - return entity_list + assemblyToElements.setdefault(element.assembly_id, []).append(element) + + for assembly in assemblies: + builder.with_construction(assembly) + + for layer in assembly.layers: + builder.with_product(layer, assembly) + + for element in assemblyToElements.get(assembly.id, []): + for layer in assembly.layers: + builder.with_product(layer, element) + + return builder.build() diff --git a/src/logic/export/utils.py b/src/logic/export/utils.py index 4117894..9f3a668 100644 --- a/src/logic/export/utils.py +++ b/src/logic/export/utils.py @@ -45,33 +45,29 @@ async def query_assemblies_for_export(project_id: str, token: str) -> dict | Non projectAssemblies(projectId: $projectId) { id name - lifeTime + lifetime : lifeTime unit - conversionFactor + conversion_factor : conversionFactor description layers { id name description - conversionFactor - referenceServiceLife - transportEpd { - id - name - } - transportDistance - transportConversionFactor + conversion_factor : conversionFactor + reference_service_life : referenceServiceLife + transport_distance : transportDistance + transport_conversion_factor : transportConversionFactor epd { id name - declaredUnit + declared_unit : declaredUnit version - validUntil - publishedDate + valid_until : validUntil + published_date : publishedDate source location subtype - referenceServiceLife + reference_service_life : referenceServiceLife comment conversions { to From 43d7498327643df8d5880d53b495757a98f566d8 Mon Sep 17 00:00:00 2001 From: LSJ Date: Fri, 17 May 2024 14:25:51 +0200 Subject: [PATCH 03/15] Removed tests of deleted code --- tests/unit/test_lcabyg_formats.py | 131 ------------------------------ 1 file changed, 131 deletions(-) delete mode 100644 tests/unit/test_lcabyg_formats.py diff --git a/tests/unit/test_lcabyg_formats.py b/tests/unit/test_lcabyg_formats.py deleted file mode 100644 index 9eb6c31..0000000 --- a/tests/unit/test_lcabyg_formats.py +++ /dev/null @@ -1,131 +0,0 @@ -import json -from pathlib import Path - -import pytest - -from logic.export.lcabyg.edges import Edge, create_edge -from logic.export.lcabyg.nodes import ConstructionNode, ElementNode, create_node -from logic.export.lcabyg.utilities import _bim7aa_to_gendk_dict -from logic.export.to_lcabyg import aggregate_lcabyg_models -from models.schema_category import SchemaCategory - - -@pytest.mark.asyncio -async def test_lcabyg_category_node(category: SchemaCategory): - """Test LCAByg node representing a SchemaCategory.""" - node = ElementNode(category) - assert node.element_category_id == "10a52123-48d7-466a-9622-d463511a6df0" # GenDK category ID - node_dict = node.as_dict() - assert node_dict == { - "Node": { - "Element": { - "id": category.id, - "name": { - "Danish": category.type_code_element.name, - "English": category.type_code_element.name, - "German": category.type_code_element.name, - }, - "active": True, - "comment": category.description, - "enabled": True, - "source": "User", - } - } - } - - -@pytest.mark.asyncio -async def test_lcabyg_element_node(category: SchemaCategory): - """Test LCAByg node representing a SchemaCategory.""" - element = category.elements[0] - node = create_node(element) - node_dict = node.as_dict() - assert node_dict == { - "Node": { - "Construction": { - "id": element.id, - "name": { - "Danish": element.name, - "English": element.name, - "German": element.name, - }, - "comment": element.description, - "layer": 1, - "locked": True, - "source": "User", - "unit": "M3", - } - } - } - - -@pytest.mark.asyncio -async def test_lcabyg_element_to_construction_edge(category: SchemaCategory): - """Test edge between element node and construction node.""" - category_node = ElementNode(category) - element_node = ConstructionNode(category.elements[0]) - edge = create_edge(element_node, category_node) - edge_dict = edge.as_dict() - assert edge_dict == { - "Edge": [ - { - "ElementToConstruction": { - "amount": category.elements[0].quantity, - "enabled": True, - "id": edge.id, - } - }, - category.id, - category.elements[0].id, - ] - } - - -@pytest.mark.asyncio -async def test_lcabyg_category_to_construction_edge(category: SchemaCategory): - """Test edge between GenDK category and construction node.""" - element_node = ConstructionNode(category.elements[0]) - edge = create_edge(element_node) - edge_dict = edge.as_dict() - assert edge_dict == { - "Edge": [ - { - "CategoryToConstruction": { - "id": edge.id, - "layers": [1], - } - }, - _bim7aa_to_gendk_dict["211"], - category.elements[0].id, - ] - } - - -@pytest.mark.asyncio -async def test_lcabyg_category_to_element_edge(category: SchemaCategory): - """Test edge between GenDK category and element node.""" - category_node = create_node(category) - edge = create_edge(category_node) - edge_dict = edge.as_dict() - - assert edge_dict == { - "Edge": [ - { - "CategoryToElement": { - "enabled": True, - "id": edge.id, - } - }, - _bim7aa_to_gendk_dict["211"], - category.id, - ] - } - - -@pytest.mark.asyncio -async def test_aggregate_lcabyg_models(datafix_dir, category: SchemaCategory): - project = json.loads((datafix_dir / "project_export.json").read_text())["data"]["projects"][0] - assemblies = json.loads((datafix_dir / "assembly_export.json").read_text())["data"]["assemblies"] - - lcabyg_data = aggregate_lcabyg_models([category], assemblies) - assert lcabyg_data From 5d1d9ea4069135aa7f242374991185aa07ffec83 Mon Sep 17 00:00:00 2001 From: LSJ Date: Fri, 17 May 2024 14:40:16 +0200 Subject: [PATCH 04/15] Hopefully fix broken unit test --- .../test_schema_export_endpoint.py | 51 ++----------------- 1 file changed, 4 insertions(+), 47 deletions(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 2b37fce..496dfa6 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -5,16 +5,10 @@ import pytest from httpx import AsyncClient from lcax.pydantic import LCAxProject -from sqlalchemy.orm import selectinload -from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from core.config import settings -from logic.export.lcabyg.edges import Edge, create_edge -from logic.export.lcabyg.nodes import Node, create_node from models.reporting_schema import ReportingSchema -from models.schema_category import SchemaCategory -from models.schema_element import SchemaElement from models.source import ProjectSource @@ -24,38 +18,9 @@ def mock_uuid(mocker): mocker.patch("logic.export.lcabyg.models.string_uuid", return_value="test") -@pytest.fixture -@pytest.mark.asyncio -async def expected_entities(mock_uuid, db, schema_elements, schema_categories) -> tuple[Node, Edge]: - """Generate expected entities.""" - async with AsyncSession(db) as session: - query = ( - select(SchemaCategory) - .options(selectinload(SchemaCategory.type_code_element)) - .options(selectinload(SchemaCategory.elements).options(selectinload(SchemaElement.schema_category))) - .options(selectinload(SchemaCategory.reporting_schema)) - ) - schema_categories = (await session.exec(query)).all() - category = schema_categories[0] - element = schema_categories[0].elements[0] - entity_tuple = ( - category_node := create_node(category), - element_node := create_node(element), - create_edge(element_node, category_node), - create_edge(element_node), - create_edge(category_node), - ) - return entity_tuple - - @pytest.mark.asyncio async def test_export_schema_to_lcabyg( - db, - client: AsyncClient, - reporting_schemas: list[ReportingSchema], - expected_entities: tuple[Node | Edge], - get_response: Callable, - query_assemblies_for_export_mock, + client: AsyncClient, reporting_schemas: list[ReportingSchema], get_response: Callable ): query = """ query ExportReportingSchema($reportingSchemaId: String!, $exportFormat: exportFormat!){ @@ -73,12 +38,12 @@ async def test_export_schema_to_lcabyg( lcabyg_list = json.loads(base64.b64decode(data["exportReportingSchema"])) # Test if correct number of entities is returned - if (a := len(lcabyg_list)) != (b := len(expected_entities)): + if (a := len(lcabyg_list)) != (b := 5): raise AssertionError(f"Expected {b} entities but got {a}.") @pytest.mark.asyncio -async def test_csv_export(client, db, reporting_schemas, schema_elements, schema_categories): +async def test_csv_export(client, db, reporting_schemas, schema_elements): query = """ query ExportReportingSchema($reportingSchemaId: String!, $exportFormat: exportFormat!){ exportReportingSchema(reportingSchemaId: $reportingSchemaId, exportFormat: $exportFormat) @@ -125,15 +90,7 @@ async def test_csv_export(client, db, reporting_schemas, schema_elements, schema @pytest.mark.asyncio -async def test_lcax_export( - client, - db, - reporting_schemas, - schema_elements, - schema_categories, - query_project_for_export_mock, - query_assemblies_for_export_mock, -): +async def test_lcax_export(client, reporting_schemas): query = """ query ExportReportingSchema($reportingSchemaId: String!, $exportFormat: exportFormat!){ exportReportingSchema(reportingSchemaId: $reportingSchemaId, exportFormat: $exportFormat) From de1ab984f6ea6c3cdbe5d06d37ae5bce1bdefc96 Mon Sep 17 00:00:00 2001 From: LSJ Date: Fri, 17 May 2024 15:02:18 +0200 Subject: [PATCH 05/15] xRevert "Hopefully fix broken unit test" This reverts commit 5d1d9ea4069135aa7f242374991185aa07ffec83. --- .../test_schema_export_endpoint.py | 51 +++++++++++++++++-- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 496dfa6..2b37fce 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -5,10 +5,16 @@ import pytest from httpx import AsyncClient from lcax.pydantic import LCAxProject +from sqlalchemy.orm import selectinload +from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from core.config import settings +from logic.export.lcabyg.edges import Edge, create_edge +from logic.export.lcabyg.nodes import Node, create_node from models.reporting_schema import ReportingSchema +from models.schema_category import SchemaCategory +from models.schema_element import SchemaElement from models.source import ProjectSource @@ -18,9 +24,38 @@ def mock_uuid(mocker): mocker.patch("logic.export.lcabyg.models.string_uuid", return_value="test") +@pytest.fixture +@pytest.mark.asyncio +async def expected_entities(mock_uuid, db, schema_elements, schema_categories) -> tuple[Node, Edge]: + """Generate expected entities.""" + async with AsyncSession(db) as session: + query = ( + select(SchemaCategory) + .options(selectinload(SchemaCategory.type_code_element)) + .options(selectinload(SchemaCategory.elements).options(selectinload(SchemaElement.schema_category))) + .options(selectinload(SchemaCategory.reporting_schema)) + ) + schema_categories = (await session.exec(query)).all() + category = schema_categories[0] + element = schema_categories[0].elements[0] + entity_tuple = ( + category_node := create_node(category), + element_node := create_node(element), + create_edge(element_node, category_node), + create_edge(element_node), + create_edge(category_node), + ) + return entity_tuple + + @pytest.mark.asyncio async def test_export_schema_to_lcabyg( - client: AsyncClient, reporting_schemas: list[ReportingSchema], get_response: Callable + db, + client: AsyncClient, + reporting_schemas: list[ReportingSchema], + expected_entities: tuple[Node | Edge], + get_response: Callable, + query_assemblies_for_export_mock, ): query = """ query ExportReportingSchema($reportingSchemaId: String!, $exportFormat: exportFormat!){ @@ -38,12 +73,12 @@ async def test_export_schema_to_lcabyg( lcabyg_list = json.loads(base64.b64decode(data["exportReportingSchema"])) # Test if correct number of entities is returned - if (a := len(lcabyg_list)) != (b := 5): + if (a := len(lcabyg_list)) != (b := len(expected_entities)): raise AssertionError(f"Expected {b} entities but got {a}.") @pytest.mark.asyncio -async def test_csv_export(client, db, reporting_schemas, schema_elements): +async def test_csv_export(client, db, reporting_schemas, schema_elements, schema_categories): query = """ query ExportReportingSchema($reportingSchemaId: String!, $exportFormat: exportFormat!){ exportReportingSchema(reportingSchemaId: $reportingSchemaId, exportFormat: $exportFormat) @@ -90,7 +125,15 @@ async def test_csv_export(client, db, reporting_schemas, schema_elements): @pytest.mark.asyncio -async def test_lcax_export(client, reporting_schemas): +async def test_lcax_export( + client, + db, + reporting_schemas, + schema_elements, + schema_categories, + query_project_for_export_mock, + query_assemblies_for_export_mock, +): query = """ query ExportReportingSchema($reportingSchemaId: String!, $exportFormat: exportFormat!){ exportReportingSchema(reportingSchemaId: $reportingSchemaId, exportFormat: $exportFormat) From 2007d8b47f1cc44f78e00d884b63218f6d81f833 Mon Sep 17 00:00:00 2001 From: LSJ Date: Fri, 17 May 2024 15:03:37 +0200 Subject: [PATCH 06/15] second attempt at fixing broken unit tests --- .../test_schema_export_endpoint.py | 26 +------------------ 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 2b37fce..12baa89 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -24,30 +24,6 @@ def mock_uuid(mocker): mocker.patch("logic.export.lcabyg.models.string_uuid", return_value="test") -@pytest.fixture -@pytest.mark.asyncio -async def expected_entities(mock_uuid, db, schema_elements, schema_categories) -> tuple[Node, Edge]: - """Generate expected entities.""" - async with AsyncSession(db) as session: - query = ( - select(SchemaCategory) - .options(selectinload(SchemaCategory.type_code_element)) - .options(selectinload(SchemaCategory.elements).options(selectinload(SchemaElement.schema_category))) - .options(selectinload(SchemaCategory.reporting_schema)) - ) - schema_categories = (await session.exec(query)).all() - category = schema_categories[0] - element = schema_categories[0].elements[0] - entity_tuple = ( - category_node := create_node(category), - element_node := create_node(element), - create_edge(element_node, category_node), - create_edge(element_node), - create_edge(category_node), - ) - return entity_tuple - - @pytest.mark.asyncio async def test_export_schema_to_lcabyg( db, @@ -73,7 +49,7 @@ async def test_export_schema_to_lcabyg( lcabyg_list = json.loads(base64.b64decode(data["exportReportingSchema"])) # Test if correct number of entities is returned - if (a := len(lcabyg_list)) != (b := len(expected_entities)): + if (a := len(lcabyg_list)) != (b := 5): raise AssertionError(f"Expected {b} entities but got {a}.") From f7f87b57c334f005be407afa2391068e76812604 Mon Sep 17 00:00:00 2001 From: LSJ Date: Fri, 17 May 2024 15:09:04 +0200 Subject: [PATCH 07/15] Third attempt at fixing broken unit tests --- tests/integration/test_schema_export_endpoint.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 12baa89..02adbc5 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -10,8 +10,6 @@ from sqlmodel.ext.asyncio.session import AsyncSession from core.config import settings -from logic.export.lcabyg.edges import Edge, create_edge -from logic.export.lcabyg.nodes import Node, create_node from models.reporting_schema import ReportingSchema from models.schema_category import SchemaCategory from models.schema_element import SchemaElement @@ -29,7 +27,6 @@ async def test_export_schema_to_lcabyg( db, client: AsyncClient, reporting_schemas: list[ReportingSchema], - expected_entities: tuple[Node | Edge], get_response: Callable, query_assemblies_for_export_mock, ): From 444becfb6ed37c5d9275a17d16b5632ef2e9ff8c Mon Sep 17 00:00:00 2001 From: Jacob Aagaard Date: Mon, 27 May 2024 13:12:46 +0200 Subject: [PATCH 08/15] Removing LCAx tests --- .../test_schema_export_endpoint.py | 41 ------------------- tests/unit/test_lcax_export.py | 11 ----- 2 files changed, 52 deletions(-) delete mode 100644 tests/unit/test_lcax_export.py diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 02adbc5..9c5cec9 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -95,44 +95,3 @@ async def test_csv_export(client, db, reporting_schemas, schema_elements, schema ] ) assert csv_rows[1] == expected_data - - -@pytest.mark.asyncio -async def test_lcax_export( - client, - db, - reporting_schemas, - schema_elements, - schema_categories, - query_project_for_export_mock, - query_assemblies_for_export_mock, -): - query = """ - query ExportReportingSchema($reportingSchemaId: String!, $exportFormat: exportFormat!){ - exportReportingSchema(reportingSchemaId: $reportingSchemaId, exportFormat: $exportFormat) - } - """ - - response = await client.post( - f"{settings.API_STR}/graphql", - json={ - "query": query, - "variables": { - "reportingSchemaId": reporting_schemas[0].id, - "exportFormat": "LCAX", - }, - }, - ) - - assert response.status_code == 200 - - data = response.json() - - assert data.get("errors") is None - assert isinstance(data["data"]["exportReportingSchema"], str) - - lcax_data = base64.b64decode(data["data"]["exportReportingSchema"]).decode("utf-8") - assert lcax_data - - lcax_project = LCAxProject(**json.loads(lcax_data)) - assert lcax_project diff --git a/tests/unit/test_lcax_export.py b/tests/unit/test_lcax_export.py deleted file mode 100644 index 57771bd..0000000 --- a/tests/unit/test_lcax_export.py +++ /dev/null @@ -1,11 +0,0 @@ -import json - -from logic.export.to_lcax import get_life_cycle_stages - - -def test_get_life_cycle_stages(datafix_dir): - project = json.loads((datafix_dir / "project_export.json").read_text())["data"]["projects"][0] - - stages = get_life_cycle_stages(project) - - assert stages == ["a1a3", "c3", "c4", "d"] From 1f7b36fd28ece9c7dd306fec2568fda526103056 Mon Sep 17 00:00:00 2001 From: Jacob Aagaard Date: Mon, 27 May 2024 15:49:18 +0200 Subject: [PATCH 09/15] Replace hardcoded string with actual ExportFormat --- tests/integration/test_schema_export_endpoint.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 9c5cec9..4b9e62b 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -14,6 +14,7 @@ from models.schema_category import SchemaCategory from models.schema_element import SchemaElement from models.source import ProjectSource +from schema.export import ExportFormat @pytest.fixture @@ -37,7 +38,7 @@ async def test_export_schema_to_lcabyg( """ variables = { "reportingSchemaId": reporting_schemas[0].id, - "exportFormat": "LCABYG", + "exportFormat": ExportFormat.LCABYG, } data = await get_response(client, query, variables=variables) From 8a4a1601f7cdf321d8cb5ab1240a76210c373f69 Mon Sep 17 00:00:00 2001 From: Jacob Aagaard Date: Mon, 27 May 2024 15:52:19 +0200 Subject: [PATCH 10/15] Fix missing value extraction for enum --- tests/integration/test_schema_export_endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 4b9e62b..63bf72d 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -38,7 +38,7 @@ async def test_export_schema_to_lcabyg( """ variables = { "reportingSchemaId": reporting_schemas[0].id, - "exportFormat": ExportFormat.LCABYG, + "exportFormat": ExportFormat.LCABYG.value, } data = await get_response(client, query, variables=variables) From e846efa9e22387271a8b9b47b6e1992bf6f192e2 Mon Sep 17 00:00:00 2001 From: Jacob Aagaard Date: Mon, 27 May 2024 16:06:54 +0200 Subject: [PATCH 11/15] Fix typo --- src/schema/export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/schema/export.py b/src/schema/export.py index 2ff35b2..d9cabdf 100644 --- a/src/schema/export.py +++ b/src/schema/export.py @@ -15,7 +15,7 @@ class ExportFormat(Enum): """Available export formats.""" - LCABYG = "lcayg" + LCABYG = "lcabyg" CSV = "csv" LCAX = "lcax" From 22c159137d2ca82257c0aea9b4f153c3072e5f96 Mon Sep 17 00:00:00 2001 From: Jacob Aagaard Date: Mon, 27 May 2024 16:12:07 +0200 Subject: [PATCH 12/15] Fix use of enum --- tests/integration/test_schema_export_endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 63bf72d..4b9e62b 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -38,7 +38,7 @@ async def test_export_schema_to_lcabyg( """ variables = { "reportingSchemaId": reporting_schemas[0].id, - "exportFormat": ExportFormat.LCABYG.value, + "exportFormat": ExportFormat.LCABYG, } data = await get_response(client, query, variables=variables) From 37eed42c5fbdbe9ecaef1ed9e20ef3df9505f6a3 Mon Sep 17 00:00:00 2001 From: Jacob Aagaard Date: Mon, 27 May 2024 16:15:12 +0200 Subject: [PATCH 13/15] Prop extraction --- tests/integration/test_schema_export_endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 4b9e62b..40e95fe 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -38,7 +38,7 @@ async def test_export_schema_to_lcabyg( """ variables = { "reportingSchemaId": reporting_schemas[0].id, - "exportFormat": ExportFormat.LCABYG, + "exportFormat": ExportFormat.LCABYG.name, } data = await get_response(client, query, variables=variables) From a07f06fe2d79e19bacaf919fde6c9010fffcb65a Mon Sep 17 00:00:00 2001 From: Jacob Aagaard Date: Mon, 27 May 2024 16:34:33 +0200 Subject: [PATCH 14/15] Streamline test setup --- tests/integration/test_schema_export_endpoint.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 40e95fe..1637017 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -41,7 +41,13 @@ async def test_export_schema_to_lcabyg( "exportFormat": ExportFormat.LCABYG.name, } - data = await get_response(client, query, variables=variables) + data = await client.post( + f"{settings.API_STR}/graphql", + json={ + "query": query, + "variables": variables, + }, + ) assert isinstance(data["exportReportingSchema"], str) lcabyg_list = json.loads(base64.b64decode(data["exportReportingSchema"])) @@ -68,7 +74,7 @@ async def test_csv_export(client, db, reporting_schemas, schema_elements, schema "query": query, "variables": { "reportingSchemaId": reporting_schemas[0].id, - "exportFormat": "CSV", + "exportFormat": ExportFormat.CSV.name, }, }, ) From ed789cc1f3d46dc51816d98b04fdc355da05b892 Mon Sep 17 00:00:00 2001 From: Jacob Aagaard Date: Mon, 27 May 2024 16:43:50 +0200 Subject: [PATCH 15/15] Streamline test setup v2 --- tests/integration/test_schema_export_endpoint.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_schema_export_endpoint.py b/tests/integration/test_schema_export_endpoint.py index 1637017..3476d26 100644 --- a/tests/integration/test_schema_export_endpoint.py +++ b/tests/integration/test_schema_export_endpoint.py @@ -41,14 +41,17 @@ async def test_export_schema_to_lcabyg( "exportFormat": ExportFormat.LCABYG.name, } - data = await client.post( + response = await client.post( f"{settings.API_STR}/graphql", json={ "query": query, "variables": variables, }, ) - assert isinstance(data["exportReportingSchema"], str) + assert response.status_code == 200 + + data = response.json() + assert isinstance(data["data"]["exportReportingSchema"], str) lcabyg_list = json.loads(base64.b64decode(data["exportReportingSchema"]))