diff --git a/application/web/web_main.py b/application/web/web_main.py index 77eee36e..7aa1095a 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -2,6 +2,7 @@ # silence mypy for the routes file import csv +import re from functools import wraps import json import logging @@ -810,9 +811,43 @@ def import_from_cre_csv() -> Any: if file is None: abort(400, "No file provided") contents = file.read() - csv_read = csv.DictReader(contents.decode("utf-8").splitlines()) try: - documents = spreadsheet_parsers.parse_export_format(list(csv_read)) + contents_text = contents.decode("utf-8") + except UnicodeDecodeError: + abort(400, "Invalid CSV encoding. Expected UTF-8.") + + if '"""' in contents_text: + abort( + 400, + "Invalid CSV content: triple-quoted text detected. Use standard CSV quoting (double quotes) instead.", + ) + + try: + csv_read = csv.DictReader(contents_text.splitlines()) + rows = list(csv_read) + except csv.Error as exc: + abort(400, f"Invalid CSV content: {exc}") + + cre_headers = [ + header for header in (csv_read.fieldnames or []) if header.startswith("CRE") + ] + cre_value_pattern = re.compile(r"^[A-Za-z0-9]{3}-[A-Za-z0-9]{3}\|.+$") + + for row_index, row in enumerate(rows, start=2): + for header in cre_headers: + value = row.get(header) + if not spreadsheet_parsers.is_empty(value): + normalized = str(value).strip() + if not cre_value_pattern.match(normalized): + abort( + 400, + "Invalid CRE column format in row " + f"{row_index}, column '{header}'. Expected XXX-XXX|Name.", + ) + try: + documents = spreadsheet_parsers.parse_export_format(rows) + except ValueError as err: + abort(400, f"Invalid CSV content: {err}") except cre_exceptions.DuplicateLinkException as dle: abort(500, f"error during parsing of the incoming CSV, err:{dle}") cres = documents.pop(defs.Credoctypes.CRE.value)