From 81856683768900d8ee39e65165d7f470481647d6 Mon Sep 17 00:00:00 2001 From: "george.robertson1" <50412379+georgeRobertson@users.noreply.github.com> Date: Tue, 30 Jun 2026 13:42:17 +0100 Subject: [PATCH 1/2] refactor: change submission rejection to file rejection in the error reports --- src/dve/pipeline/pipeline.py | 19 +++++++++++++------ src/dve/reporting/constants.py | 21 +++++++++++++++++++++ src/dve/reporting/error_report.py | 3 ++- src/dve/reporting/excel_report.py | 5 +++-- tests/test_pipeline/test_spark_pipeline.py | 2 +- 5 files changed, 40 insertions(+), 10 deletions(-) create mode 100644 src/dve/reporting/constants.py diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 1c32e87..5656bfa 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -42,6 +42,7 @@ from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation from dve.parser.file_handling.service import _get_implementation from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader +from dve.reporting.constants import ErrorReportStatus from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates PERMISSIBLE_EXCEPTIONS: tuple[type[Exception]] = ( @@ -764,8 +765,8 @@ def _get_error_dataframes(self, submission_id: str): pl.when(pl.col("Status") == pl.lit("informational")) .then(pl.lit("Warning")) .when(pl.col("FailureType") == pl.lit("submission")) # type: ignore - .then(pl.lit("Submission Failure")) # type: ignore - .otherwise(pl.lit("Record Rejection")) # type: ignore + .then(pl.lit(ErrorReportStatus.FILE_REJECTION.reporting_name)) # type: ignore + .otherwise(pl.lit(ErrorReportStatus.RECORD_REJECTION.reporting_name)) # type: ignore .alias("error_type") # type: ignore ) df = df.select( @@ -828,9 +829,15 @@ def error_report( sub_stats = SubmissionStatisticsRecord( submission_id=submission_info.submission_id, record_count=submission_status.number_of_records, - number_submission_rejections=err_types.get("Submission Failure", 0), - number_record_rejections=err_types.get("Record Rejection", 0), - number_warnings=err_types.get("Warning", 0), + number_submission_rejections=err_types.get( + ErrorReportStatus.FILE_REJECTION.reporting_name, + 0 + ), + number_record_rejections=err_types.get( + ErrorReportStatus.RECORD_REJECTION.reporting_name, + 0 + ), + number_warnings=err_types.get(ErrorReportStatus.WARNING.reporting_name, 0), ) summary_dict = { @@ -841,7 +848,7 @@ def error_report( summary_items = er.SummaryItems( submission_status=submission_status, summary_dict=summary_dict, - row_headings=["Submission Failure", "Record Rejection", "Warning"], + row_headings=[e.reporting_name for e in ErrorReportStatus], ) workbook = er.ExcelFormat( diff --git a/src/dve/reporting/constants.py b/src/dve/reporting/constants.py new file mode 100644 index 0000000..1837524 --- /dev/null +++ b/src/dve/reporting/constants.py @@ -0,0 +1,21 @@ +""" +Constants used within the error reports +""" + +from enum import Enum + + +class ErrorReportStatus(Enum): + """ + Constant to centrally hold error report status. + """ + FILE_REJECTION = 1, "File Rejection" + RECORD_REJECTION = 2, "Record Rejection" + WARNING = 3, "Warning" + + @property + def reporting_name(self): + """ + The error report 'friendly' name. + """ + return self.value[1] diff --git a/src/dve/reporting/error_report.py b/src/dve/reporting/error_report.py index 9e947bf..8169aba 100644 --- a/src/dve/reporting/error_report.py +++ b/src/dve/reporting/error_report.py @@ -11,6 +11,7 @@ from dve.common.error_utils import conditional_cast from dve.core_engine.message import FeedbackMessage from dve.parser.file_handling.service import open_stream +from dve.reporting.constants import ErrorReportStatus ERROR_SCHEMA = { "Table": Utf8(), @@ -85,7 +86,7 @@ def create_error_dataframe(errors: deque[FeedbackMessage], key_fields): df = df.with_columns( # type: ignore pl.when(pl.col("Status") == pl.lit("error")) # type: ignore - .then(pl.lit("Submission Failure")) # type: ignore + .then(pl.lit(ErrorReportStatus.FILE_REJECTION.reporting_name)) # type: ignore .otherwise(pl.lit("Warning")) # type: ignore .alias("error_type") ) diff --git a/src/dve/reporting/excel_report.py b/src/dve/reporting/excel_report.py index 9471c83..7e9f7f4 100644 --- a/src/dve/reporting/excel_report.py +++ b/src/dve/reporting/excel_report.py @@ -17,6 +17,7 @@ from polars.exceptions import ColumnNotFoundError from dve.pipeline.utils import SubmissionStatus +from dve.reporting.constants import ErrorReportStatus @dataclass @@ -97,9 +98,9 @@ def get_submission_status(self, aggregates: DataFrame) -> str: if aggregates.is_empty(): return "File has been accepted, no issues to report" failures = aggregates["Type"].unique() - if "Submission Failure" in failures: + if ErrorReportStatus.FILE_REJECTION.reporting_name in failures: status = "File has been rejected" - elif "Warning" in failures: + elif ErrorReportStatus.WARNING.reporting_name in failures: status = "File has been accepted, all records accepted with warnings" else: status = "File has been accepted, no issues to report" diff --git a/tests/test_pipeline/test_spark_pipeline.py b/tests/test_pipeline/test_spark_pipeline.py index 063ced7..0cf7fe6 100644 --- a/tests/test_pipeline/test_spark_pipeline.py +++ b/tests/test_pipeline/test_spark_pipeline.py @@ -440,7 +440,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out ("File Name", "doesnotmatter"), ("File Extension", "json"), ("Total Number of Records Processed", "9"), - ("Submission Failure", "0"), + ("File Rejection", "0"), ("Record Rejection", "2"), ("Warning", "0"), ] From 802d6ee196188eaa80dcedf83e4c74ee7d8df28c Mon Sep 17 00:00:00 2001 From: "george.robertson1" <50412379+georgeRobertson@users.noreply.github.com> Date: Tue, 30 Jun 2026 14:45:40 +0100 Subject: [PATCH 2/2] style: formatting with black and isort --- src/dve/pipeline/pipeline.py | 8 +++----- src/dve/reporting/constants.py | 1 + src/dve/reporting/excel_report.py | 16 +++++++++++----- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 5656bfa..7a43391 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -224,7 +224,7 @@ def write_file_to_parquet( submission_file_uri, model_name, stringify_model(model), # type: ignore - get_all_model_fields(models.values()) # type: ignore + get_all_model_fields(models.values()), # type: ignore ), f"{out}{model_name}", ) @@ -830,12 +830,10 @@ def error_report( submission_id=submission_info.submission_id, record_count=submission_status.number_of_records, number_submission_rejections=err_types.get( - ErrorReportStatus.FILE_REJECTION.reporting_name, - 0 + ErrorReportStatus.FILE_REJECTION.reporting_name, 0 ), number_record_rejections=err_types.get( - ErrorReportStatus.RECORD_REJECTION.reporting_name, - 0 + ErrorReportStatus.RECORD_REJECTION.reporting_name, 0 ), number_warnings=err_types.get(ErrorReportStatus.WARNING.reporting_name, 0), ) diff --git a/src/dve/reporting/constants.py b/src/dve/reporting/constants.py index 1837524..657375e 100644 --- a/src/dve/reporting/constants.py +++ b/src/dve/reporting/constants.py @@ -9,6 +9,7 @@ class ErrorReportStatus(Enum): """ Constant to centrally hold error report status. """ + FILE_REJECTION = 1, "File Rejection" RECORD_REJECTION = 2, "Record Rejection" WARNING = 3, "Warning" diff --git a/src/dve/reporting/excel_report.py b/src/dve/reporting/excel_report.py index 7e9f7f4..de264d5 100644 --- a/src/dve/reporting/excel_report.py +++ b/src/dve/reporting/excel_report.py @@ -142,11 +142,17 @@ def _add_submission_info(self, status: str, summary: Worksheet): for key, value in self.summary_dict.items(): summary.append(["", _key_renames.get(key, key), str(value)]) - summary.append([ - "", - "Total Number of Records Processed", - self.submission_status.number_of_records if self.submission_status.number_of_records else 0 # pylint: disable=C0301 - ]) + summary.append( + [ + "", + "Total Number of Records Processed", + ( + self.submission_status.number_of_records + if self.submission_status.number_of_records + else 0 + ), # pylint: disable=C0301 + ] + ) summary.append(["", ""])