Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
python 3.11.14
poetry 2.3.3
python 3.12.12
poetry 2.4.1
java liberica-1.8.0
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions src/dve/core_engine/backends/base/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ def read_to_entity_type(
"""
if entity_name == Iterator[dict[str, Any]]:
return self.read_to_py_iterator(
resource,
entity_name,
schema, # type: ignore
all_model_fields
resource, entity_name, schema, all_model_fields # type: ignore
)

self.raise_if_not_sensible_file(resource, entity_name)
Expand All @@ -133,11 +130,7 @@ def read_to_entity_type(
raise ReaderLacksEntityTypeSupport(entity_type=entity_type) from err

return reader_func(
self,
resource,
entity_name,
schema,
all_model_fields=all_model_fields # type: ignore
self, resource, entity_name, schema, all_model_fields=all_model_fields # type: ignore
)

def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
Expand Down
5 changes: 1 addition & 4 deletions src/dve/core_engine/backends/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ class UnableToParseCSVError(MessageBearingError):
"""An error raised when unable to parse a CSV file"""

def __init__(
self,
entity_name: str,
field_check_error_message: str,
field_check_error_code: str
self, entity_name: str, field_check_error_message: str, field_check_error_code: str
):
super().__init__(
messages=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ def _ddb_filter_contract_errors(
"Entity": "STRING",
},
)
.filter(f"FailureType == 'record' AND Status != 'informational' AND Entity = '{entity_name}'") # pylint: disable=C0301
.filter(
f"FailureType == 'record' AND Status != 'informational' AND Entity = '{entity_name}'"
) # pylint: disable=C0301
.select("RecordIndex")
.distinct()
.order("RecordIndex asc")
Expand All @@ -286,9 +288,7 @@ def _ddb_filter_contract_errors(
return entity

filtered_entity = entity.join(
relevant_record_rejection_codes_rel,
condition="__record_index__ == RecordIndex",
how="anti"
relevant_record_rejection_codes_rel, condition="__record_index__ == RecordIndex", how="anti"
)
return filtered_entity

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
quote_char=quotechar,
field_check=field_check,
field_check_error_code=field_check_error_code,
field_check_error_message=field_check_error_message
field_check_error_message=field_check_error_message,
)

def read_to_py_iterator(
Expand Down Expand Up @@ -254,7 +254,7 @@ def read_to_relation( # pylint: disable=unused-argument
resource=resource,
entity_name=entity_name,
schema=schema,
all_model_fields=all_model_fields
all_model_fields=all_model_fields,
)
entity = entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME])).distinct()
no_records = entity.shape[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
from pyspark.sql.types import StructType

from dve.core_engine.backends.base.reader import read_function
from dve.core_engine.backends.readers.csv import CSVFileReader
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.implementations.spark.spark_helpers import (
get_type_from_annotation,
spark_record_index,
spark_write_parquet,
)
from dve.core_engine.backends.readers.csv import CSVFileReader
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import get_content_length

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
create_udf,
get_all_registered_udfs,
object_to_spark_literal,
spark_filter_contract_errors,
spark_read_parquet,
spark_record_index,
spark_write_parquet,
spark_filter_contract_errors,
)
from dve.core_engine.backends.implementations.spark.types import (
Joined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
from pyspark.sql.types import LongType, StructField, StructType
from typing_extensions import Annotated, Protocol, TypedDict, get_args, get_origin, get_type_hints

from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
from dve.common.error_utils import get_feedback_errors_uri
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.type_hints import URI, EntityName

Expand Down Expand Up @@ -380,12 +380,14 @@ def _spark_filter_contract_errors(
relevant_record_rejections_codes_df = (
self.spark_session.read.json(
path=contract_error_location,
schema=st.StructType([
st.StructField("RecordIndex", st.IntegerType()),
st.StructField("FailureType", st.StringType()),
st.StructField("Status", st.StringType()),
st.StructField("Entity", st.StringType()),
]),
schema=st.StructType(
[
st.StructField("RecordIndex", st.IntegerType()),
st.StructField("FailureType", st.StringType()),
st.StructField("Status", st.StringType()),
st.StructField("Entity", st.StringType()),
]
),
)
.filter(
(sf.col("FailureType") == sf.lit("record"))
Expand Down
2 changes: 1 addition & 1 deletion src/dve/core_engine/backends/metadata/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def schemas(self) -> dict[EntityName, type[BaseModel]]:
"""The per-entity schemas, as pydantic models."""
if not self._schemas:
for entity_name, validator in self.validators.items():
self._schemas[entity_name] = validator.model # type: ignore # pylint: disable=E1137
self._schemas[entity_name] = validator.model # type: ignore # pylint: disable=E1137
return self._schemas.copy() # pylint: disable=E1101

@root_validator(allow_reuse=True)
Expand Down
4 changes: 2 additions & 2 deletions src/dve/core_engine/backends/readers/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
MissingHeaderError,
)
from dve.core_engine.backends.readers.utilities import (
raise_message_bearing_error_on_header_differences
get_all_model_fields,
raise_message_bearing_error_on_header_differences,
)
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.backends.readers.utilities import get_all_model_fields
from dve.core_engine.type_hints import EntityName
from dve.parser.file_handling import get_content_length, open_stream
from dve.parser.file_handling.implementations.file import file_uri_to_local_path
Expand Down
14 changes: 7 additions & 7 deletions src/dve/core_engine/backends/readers/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ def raise_message_bearing_error_on_header_differences(
header or vice versa.
"""
missing, additional = check_csv_header_expected(
resource,
expected_schema,
all_model_fields,
delimiter,
quote_char
resource, expected_schema, all_model_fields, delimiter, quote_char
)

if missing or additional:
record_details_missing = f"missing fields: {', '.join(sorted(missing))};" if missing else "" # pylint: disable=C0301
record_details_additional = f"additional fields: {', '.join(sorted(additional))};" if additional else "" # pylint: disable=C0301
record_details_missing = (
f"missing fields: {', '.join(sorted(missing))};" if missing else ""
) # pylint: disable=C0301
record_details_additional = (
f"additional fields: {', '.join(sorted(additional))};" if additional else ""
) # pylint: disable=C0301
raise MessageBearingError(
"The CSV header doesn't match what is expected",
messages=[
Expand Down
4 changes: 3 additions & 1 deletion src/dve/core_engine/configuration/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ def _load_rules_and_vars(self) -> tuple[list[Rule], list[TemplateVariables]]:
rules, local_variable_list = [], []
added_rules: set[RuleName] = set()

for index, complex_rule_config in enumerate(self.transformations.complex_rules): # pylint: disable=E1101
for index, complex_rule_config in enumerate(
self.transformations.complex_rules
): # pylint: disable=E1101
rule, local_params, deps = self._resolve_business_rule(complex_rule_config)
missing_rules = deps - added_rules
if missing_rules:
Expand Down
16 changes: 12 additions & 4 deletions src/dve/core_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ def __exit__(
exc_value: Optional[Exception],
traceback: Optional[TracebackType],
) -> None:
self.main_log.info(f"Exiting pipeline context, clearing {self.cache_prefix!r}") # pylint: disable=E1101
self.main_log.info(
f"Exiting pipeline context, clearing {self.cache_prefix!r}"
) # pylint: disable=E1101
cache_dir = self._cache_dir
self._cache_dir = None

Expand Down Expand Up @@ -198,17 +200,23 @@ def _write_entity_outputs(self, entities: SparkEntities) -> SparkEntities:
"""
output_entities = {}

self.main_log.info(f"Writing entities to the output location: {self.output_prefix_uri}") # pylint: disable=E1101
self.main_log.info(
f"Writing entities to the output location: {self.output_prefix_uri}"
) # pylint: disable=E1101
for entity_name, entity in entities.items():
entity = entity.drop(RECORD_INDEX_COLUMN_NAME)

self.main_log.info(f"Entity: {entity_name} {type(entity)}") # pylint: disable=E1101

output_uri = joinuri(self.output_prefix_uri, entity_name)
if get_resource_exists(output_uri):
self.main_log.info(f"{output_uri} already exists - will be overwritten") # pylint: disable=E1101
self.main_log.info(
f"{output_uri} already exists - will be overwritten"
) # pylint: disable=E1101

self.main_log.info(f"+ Writing parquet output to {output_uri!r}") # pylint: disable=E1101
self.main_log.info(
f"+ Writing parquet output to {output_uri!r}"
) # pylint: disable=E1101
entity.write.mode("overwrite").parquet(output_uri)
spark_session = SparkSession.builder.getOrCreate()
output_entities[entity_name] = spark_session.read.format("parquet").load(
Expand Down
20 changes: 5 additions & 15 deletions src/dve/core_engine/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
"""Define custom error codes for validation issues raised during the data contract phase"""

error_code: str
error_level: Optional[FailureType] = "record"
is_informational: Optional[bool] = False
error_message: Optional[str] = None
reporting_entity: Optional[str] = None

Expand Down Expand Up @@ -247,26 +249,14 @@
messages: Messages = []
for error_dict in error.errors():
error_type = error_dict["type"]
# TODO - review in pydantic v2 - how handles null vs not provided values

Check warning on line 252 in src/dve/core_engine/message.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this "TODO" comment.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZ8XuJ_GmHX9CG1pCnBx&open=AZ8XuJ_GmHX9CG1pCnBx&pullRequest=118
if "none.not_allowed" in error_type or "value_error.missing" in error_type:
category = "Blank"
else:
category = "Bad value"
error_code = error_type
if "." in error_code:
error_code = error_code.split(".", 1)[-1]

if error_code in INTEGRITY_ERROR_CODES:
failure_type: FailureType = "integrity"
elif error_code in SUBMISSION_ERROR_CODES:
failure_type = "submission"
else:
failure_type = "record"

error_field = ".".join([idx for idx in error_dict["loc"] if not isinstance(idx, int)])

is_informational = False
if error_code.endswith("warning"):
is_informational = True
error_detail: DataContractErrorDetail = error_details.get( # type: ignore
error_field, DEFAULT_ERROR_DETAIL
).get(category)
Expand All @@ -276,8 +266,8 @@
entity=error_detail.reporting_entity or entity,
original_entity=entity,
record=record,
failure_type=failure_type,
is_informational=is_informational,
failure_type=error_detail.error_level, # type: ignore
is_informational=error_detail.is_informational, # type: ignore
error_type=error_type,
error_location=error_dict["loc"], # type: ignore
error_message=error_detail.template_message(record, error_dict["loc"]),
Expand Down
2 changes: 1 addition & 1 deletion src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def write_file_to_parquet(
submission_file_uri,
model_name,
stringify_model(model), # type: ignore
get_all_model_fields(models.values()) # type: ignore
get_all_model_fields(models.values()), # type: ignore
),
f"{out}{model_name}",
)
Expand Down
16 changes: 11 additions & 5 deletions src/dve/reporting/excel_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,17 @@ def _add_submission_info(self, status: str, summary: Worksheet):
for key, value in self.summary_dict.items():
summary.append(["", _key_renames.get(key, key), str(value)])

summary.append([
"",
"Total Number of Records Processed",
self.submission_status.number_of_records if self.submission_status.number_of_records else 0 # pylint: disable=C0301
])
summary.append(
[
"",
"Total Number of Records Processed",
(
self.submission_status.number_of_records
if self.submission_status.number_of_records
else 0
), # pylint: disable=C0301
]
)
summary.append(["", ""])


Expand Down
Loading
Loading