Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation
from dve.parser.file_handling.service import _get_implementation
from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader
from dve.reporting.constants import ErrorReportStatus
from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates

PERMISSIBLE_EXCEPTIONS: tuple[type[Exception]] = (
Expand Down Expand Up @@ -223,7 +224,7 @@ def write_file_to_parquet(
submission_file_uri,
model_name,
stringify_model(model), # type: ignore
get_all_model_fields(models.values()) # type: ignore
get_all_model_fields(models.values()), # type: ignore
),
f"{out}{model_name}",
)
Expand Down Expand Up @@ -764,8 +765,8 @@ def _get_error_dataframes(self, submission_id: str):
pl.when(pl.col("Status") == pl.lit("informational"))
.then(pl.lit("Warning"))
.when(pl.col("FailureType") == pl.lit("submission")) # type: ignore
.then(pl.lit("Submission Failure")) # type: ignore
.otherwise(pl.lit("Record Rejection")) # type: ignore
.then(pl.lit(ErrorReportStatus.FILE_REJECTION.reporting_name)) # type: ignore
.otherwise(pl.lit(ErrorReportStatus.RECORD_REJECTION.reporting_name)) # type: ignore
.alias("error_type") # type: ignore
)
df = df.select(
Expand Down Expand Up @@ -828,9 +829,13 @@ def error_report(
sub_stats = SubmissionStatisticsRecord(
submission_id=submission_info.submission_id,
record_count=submission_status.number_of_records,
number_submission_rejections=err_types.get("Submission Failure", 0),
number_record_rejections=err_types.get("Record Rejection", 0),
number_warnings=err_types.get("Warning", 0),
number_submission_rejections=err_types.get(
ErrorReportStatus.FILE_REJECTION.reporting_name, 0
),
number_record_rejections=err_types.get(
ErrorReportStatus.RECORD_REJECTION.reporting_name, 0
),
number_warnings=err_types.get(ErrorReportStatus.WARNING.reporting_name, 0),
)

summary_dict = {
Expand All @@ -841,7 +846,7 @@ def error_report(
summary_items = er.SummaryItems(
submission_status=submission_status,
summary_dict=summary_dict,
row_headings=["Submission Failure", "Record Rejection", "Warning"],
row_headings=[e.reporting_name for e in ErrorReportStatus],
)

workbook = er.ExcelFormat(
Expand Down
22 changes: 22 additions & 0 deletions src/dve/reporting/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Constants used within the error reports
"""

from enum import Enum


class ErrorReportStatus(Enum):
"""
Constant to centrally hold error report status.
"""

FILE_REJECTION = 1, "File Rejection"
RECORD_REJECTION = 2, "Record Rejection"
WARNING = 3, "Warning"

@property
def reporting_name(self):
"""
The error report 'friendly' name.
"""
return self.value[1]
3 changes: 2 additions & 1 deletion src/dve/reporting/error_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dve.common.error_utils import conditional_cast
from dve.core_engine.message import FeedbackMessage
from dve.parser.file_handling.service import open_stream
from dve.reporting.constants import ErrorReportStatus

ERROR_SCHEMA = {
"Table": Utf8(),
Expand Down Expand Up @@ -85,7 +86,7 @@ def create_error_dataframe(errors: deque[FeedbackMessage], key_fields):

df = df.with_columns( # type: ignore
pl.when(pl.col("Status") == pl.lit("error")) # type: ignore
.then(pl.lit("Submission Failure")) # type: ignore
.then(pl.lit(ErrorReportStatus.FILE_REJECTION.reporting_name)) # type: ignore
.otherwise(pl.lit("Warning")) # type: ignore
.alias("error_type")
)
Expand Down
21 changes: 14 additions & 7 deletions src/dve/reporting/excel_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from polars.exceptions import ColumnNotFoundError

from dve.pipeline.utils import SubmissionStatus
from dve.reporting.constants import ErrorReportStatus


@dataclass
Expand Down Expand Up @@ -97,9 +98,9 @@ def get_submission_status(self, aggregates: DataFrame) -> str:
if aggregates.is_empty():
return "File has been accepted, no issues to report"
failures = aggregates["Type"].unique()
if "Submission Failure" in failures:
if ErrorReportStatus.FILE_REJECTION.reporting_name in failures:
status = "File has been rejected"
elif "Warning" in failures:
elif ErrorReportStatus.WARNING.reporting_name in failures:
status = "File has been accepted, all records accepted with warnings"
else:
status = "File has been accepted, no issues to report"
Expand Down Expand Up @@ -141,11 +142,17 @@ def _add_submission_info(self, status: str, summary: Worksheet):
for key, value in self.summary_dict.items():
summary.append(["", _key_renames.get(key, key), str(value)])

summary.append([
"",
"Total Number of Records Processed",
self.submission_status.number_of_records if self.submission_status.number_of_records else 0 # pylint: disable=C0301
])
summary.append(
[
"",
"Total Number of Records Processed",
(
self.submission_status.number_of_records
if self.submission_status.number_of_records
else 0
), # pylint: disable=C0301
]
)
summary.append(["", ""])


Expand Down
2 changes: 1 addition & 1 deletion tests/test_pipeline/test_spark_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out
("File Name", "doesnotmatter"),
("File Extension", "json"),
("Total Number of Records Processed", "9"),
("Submission Failure", "0"),
("File Rejection", "0"),
("Record Rejection", "2"),
("Warning", "0"),
]
Expand Down
Loading