From 5ae9b39e47406066266d9af8615c5f69c1c74003 Mon Sep 17 00:00:00 2001 From: Gabriela de Santana Carrara Date: Mon, 27 Apr 2026 13:33:37 +0200 Subject: [PATCH 1/2] pipeline: Add XML production and CERNBox upload steps * Generate per-Boite and combined XMLs with flexible output paths * Implement upload_to_cernbox for syncing results and logs * Support temporary directories * ref cern-sis/digitization#19, cern-sis/digitization#23 --- refactory/README.md | 74 +++++++++----- refactory/cli.py | 134 ++++++++++++++++--------- refactory/file_import/boite_matcher.py | 2 +- refactory/file_import/xml_exporter.py | 90 +++++++++++++++++ refactory/storage_connection.py | 62 ++++++++++-- 5 files changed, 281 insertions(+), 81 deletions(-) create mode 100644 refactory/file_import/xml_exporter.py diff --git a/refactory/README.md b/refactory/README.md index 9aab013..e7b8cda 100644 --- a/refactory/README.md +++ b/refactory/README.md @@ -1,18 +1,18 @@ # refactory -This directory contains tools for validating PDF files and matching Boite Excel inventory records against S3 files. +This directory contains tools for validating PDF files, matching Boite Excel inventory records against S3 files, and optionally exporting the results to XML (FFT) for CDS upload. ## Structure - `cli.py` - click CLI exposing the main workflows: - `validate-files-integrity` - - `file-match` + - `match-and-export` - `storage_connection.py` - storage provider abstraction: - `S3Provider` for S3. - - `CernboxProvider` for public CERNBox access. + - `CernboxProvider` for public/authenticated CERNBox access. - `check_files/main.py` - validation pipeline used by `validate-files-integrity`. -- `file_import/refactory_matcher.py` - Boite-to-S3 matcher implementation used by `file-match`. -- `file_import/boite_matcher.py` - additional matcher implementation and helpers. +- `file_import/boite_matcher.py` - Boite-to-S3 matcher implementation used by `match-and-export`. +- `file_import/xml_exporter.py` - XML generator (FFT) used for CDS batch uploads. ## CLI usage @@ -25,7 +25,9 @@ poetry run digitization_v2 --help The available commands are: - `validate-files-integrity` — validate PDF integrity and inventory alignment. -- `file-match` — match Boite Excel records against S3 files and generate JSON outputs. +- `match-and-export` — match Boite Excel records against S3 files, generate JSON outputs, and optionally export/upload XMLs. + +--- ## 1. Validate files integrity @@ -38,45 +40,58 @@ poetry run digitization_v2 validate-files-integrity \ -b digitization-dev ``` -Options: +**Options:** - `-d, --data-source` — Boite inventory source. Supports a CERNBox hash, range (`1..10`), or list (`[1,2]`). - `-u, --upload-reports` — upload validation reports back to storage. - `-b, --bucket` — S3 bucket name (default: `digitization-dev`). +- `-p, --base-path` — Base S3 path (default: `cern-archives/raw/PDF/`). This command runs the validation pipeline and generates logs such as `s3_pdf_issues.log`. -## 2. Boite-to-S3 file matching +--- + +## 2. Match and Export (Boite-to-S3) -Use this command to match Boite Excel filenames with S3 objects and write structured JSON output. +Use this command to match Boite Excel filenames with S3 objects, write structured JSON outputs, and optionally generate and upload XML files for CDS. ```bash -poetry run digitization_v2 file-match \ - -d "https://cernbox.cern.ch/s/{hash}" \ - -o ./match_results \ +poetry run digitization_v2 match-and-export \ + -d "[https://cernbox.cern.ch/s/](https://cernbox.cern.ch/s/){hash}" \ + -p "cern-archives/raw/" \ + -o ./results \ -f PDF,PDF_LATEX \ - -b digitization-dev + -b digitization-dev \ + -x \ + -c ``` -Options: +**Options:** - `-d, --data-source` — local directory or CERNBox URL containing `.xlsx` Boite files. -- `-o, --output-path` — output directory for JSON results (default: `./match_results`). +- `-p, --base-path` — Base S3 path (default: `cern-archives/raw/`). +- `-o, --output-path` — output directory for JSON/XML results (default: `./results`). - `-f, --file-types` — comma-separated list of file types to match (default: `PDF,PDF_LATEX`). - `-b, --bucket` — S3 bucket name (default: `digitization-dev`). +- `-x, --generate-xml` — Generate XML files (FFT) for CDS upload. +- `-c, --upload-cernbox` — Upload the generated XML files to CERNBox. +- `--cernbox-path` — Target folder inside CERNBox for XML uploads (default: `xml_exports`). + +### Matcher & Export behavior -### Matcher behavior +The `match-and-export` flow: -The `file-match` flow: +1. **Downloads** `.xlsx` Boite files from CERNBox if a URL is provided. +2. **Reads** each Boite file and extracts the record ID and filename columns. +3. **Searches** S3 under `//`. +4. **Matches** filenames case-insensitively. Supports both flat and subfolder layouts: + - *Flat:* `raw/PDF_LATEX/BOITE_O0125/ISR-LEP-RF-GG-ps.pdf` + - *Nested:* `raw/PDF/BOITE_O0125/LEP-RF-SH-ps/LEP-RF-SH-ps.pdf` +5. **Generates** unified mismatch logs in JSON format for missing Boite rows and extra S3 files. +6. **(Optional) Exports** matching records to XML files if the `-x` flag is used. +7. **(Optional) Uploads** the generated XMLs to a specified path in CERNBox if the `-c` flag is used. -- downloads `.xlsx` Boite files from CERNBox if a URL is provided. -- reads each Boite file and extracts the record ID and filename columns. -- searches S3 under `raw///`. -- matches filenames case-insensitively. -- supports both flat and subfolder layouts: - - flat: `raw/PDF_LATEX/BOITE_O0125/ISR-LEP-RF-GG-ps.pdf` - - nested: `raw/PDF/BOITE_O0125/LEP-RF-SH-ps/LEP-RF-SH-ps.pdf` -- writes unified mismatch logs in JSON format for missing Boite rows and extra S3 files. +--- ## Dependencies @@ -93,6 +108,9 @@ poetry install - `boto3` - `requests` - `pypdf` +- `click` + +--- ## AWS Authentication @@ -116,6 +134,8 @@ export SECRET_KEY="YOUR_SECRET_KEY" > `S3Provider` also supports the default endpoint `https://s3.cern.ch`, configured in `storage_connection.py`. +--- + ## CERNBox Authentication `CernboxProvider` reads optional credentials from environment variables: @@ -130,8 +150,10 @@ export CERNBOX_USER="your_username" export CERNBOX_PASSWORD="your_password" ``` +--- + ## Notes -- `file_import/refactory_matcher.py` is the primary matcher used by `file-match`. +- `file_import/boite_matcher.py` is the primary matcher used by `match-and-export`. - `test_connections.py` can be used to verify storage connectivity before running either workflow. - Use `poetry run digitization_v2 --help` to verify command names and options at runtime. diff --git a/refactory/cli.py b/refactory/cli.py index 4f62237..a663624 100644 --- a/refactory/cli.py +++ b/refactory/cli.py @@ -1,9 +1,12 @@ import click import ast -from .check_files.main import run_validation_pipeline -from refactory.storage_connection import S3Provider +import os +from pathlib import Path +from .check_files.main import run_validation_pipeline +from refactory.storage_connection import S3Provider, CernboxProvider from .file_import.boite_matcher import BoiteS3Matcher +from .file_import.xml_exporter import XMLExporter def parse_inventory(value): @@ -11,8 +14,9 @@ def parse_inventory(value): Parses the input to identify if it's a literal list, a range of IDs (1..10), or a single string/ID. """ - if value.isdigit(): + if isinstance(value, int) or value.isdigit(): return [int(value)] + if value.startswith("[") and value.endswith("]"): try: return ast.literal_eval(value) @@ -24,7 +28,10 @@ def parse_inventory(value): start, end = map(int, value.split("..")) return list(range(start, end + 1)) except ValueError: - pass + raise click.BadParameter( + "Invalid range format. Use 'start..end' (e.g., 1..10)" + ) + return value @@ -32,19 +39,18 @@ def parse_inventory(value): def digitization_v2(): pass - @digitization_v2.command("validate-files-integrity") @click.option( "-d", "--data-source", required=True, - help="Boite Files. Supports a CERNBOX hash, range 1..10, or list [1,2].", + help="Inventory source (CERNBOX hash, range 1..10, or list [1,2]).", ) @click.option( "-u", "--upload-reports", is_flag=True, - help="Upload validation reports back to the storage provider.", + help="Upload validation reports back to storage.", ) @click.option( "-b", @@ -58,14 +64,10 @@ def digitization_v2(): "--base-path", default="cern-archives/raw/PDF/", show_default=True, - help="Base S3 path to validate.", + help="Base S3 path.", ) def validate_files_integrity(data_source, base_path, bucket, upload_reports): - """ - Validates files integrity and inventory alignment. - This command checks for corrupted files and missing boxes. - """ - + """Validates files integrity and inventory alignment.""" inventory_input = parse_inventory(data_source) provider = S3Provider(bucket=bucket) @@ -77,38 +79,35 @@ def validate_files_integrity(data_source, base_path, bucket, upload_reports): data_source=inventory_input, upload_reports=upload_reports, ) - click.echo("Process finished. Check the generated logs for details.") + click.echo("Process finished. Check logs for details.") except Exception as e: click.secho(f"Error: {e}", fg="red", err=True) -@digitization_v2.command("file-match") +@digitization_v2.command("match-and-export") @click.option( - "-d", - "--data-source", - required=True, - help="Target data source. Supports a local directory path or a CERNBOX URL.", + "-d", "--data-source", required=True, help="Local directory path or CERNBOX URL." ) @click.option( "-p", "--base-path", default="cern-archives/raw/", show_default=True, - help="Base S3 path to validate.", + help="Base S3 path.", ) @click.option( "-o", "--output-path", - default="./match_results", + default="./results", show_default=True, - help="Directory to save the generated JSON files (records and mismatches).", + help="Output directory.", ) @click.option( "-f", "--file-types", default="PDF,PDF_LATEX", show_default=True, - help="Comma-separated list of file types to match (e.g., 'PDF,PDF_LATEX,TIFF').", + help="Comma-separated file types.", ) @click.option( "-b", @@ -117,26 +116,39 @@ def validate_files_integrity(data_source, base_path, bucket, upload_reports): show_default=True, help="S3 Bucket name.", ) +@click.option( + "-x", + "--generate-xml", + is_flag=True, + help="Generate XML files (FFT) for CDS upload.", +) +@click.option( + "-c", "--upload-cernbox", is_flag=True, help="Upload XML files to CERNBox." +) +@click.option( + "--cernbox-path", + default="xml_exports", + show_default=True, + help="Target folder inside CERNBox.", +) +def match_and_export( + data_source, + base_path, + output_path, + file_types, + bucket, + generate_xml, + upload_cernbox, + cernbox_path, +): + """Matches Excel records against S3 and optionally exports to XML/CERNBox.""" + + os.makedirs(output_path, exist_ok=True) -def file_match(data_source, base_path, output_path, file_types, bucket): - """ - Matches Boite Excel records against S3 files and generates JSON payloads. - Generates a success JSON per Boite and a unified mismatch log. - """ - - CUSTOM_EXPIRATION = { - # Example: uncomment the line below to test it - # "PDF": 10, - # "PDF_LATEX": 45 - } - - provider = S3Provider(bucket=bucket, custom_expiration=CUSTOM_EXPIRATION) - + provider = S3Provider(bucket=bucket) parsed_file_types = [t.strip() for t in file_types.split(",")] - click.echo("Starting match process...") - click.echo(f"Source: {data_source}") - click.echo(f"File types: {', '.join(parsed_file_types)}") + click.echo(f"Starting process for: {data_source}") try: matcher = BoiteS3Matcher( @@ -147,13 +159,45 @@ def file_match(data_source, base_path, output_path, file_types, bucket): file_types=parsed_file_types, ) - matcher.execute() + results_map = matcher.execute() + click.secho(f"Match completed. Results in: {output_path}", fg="green") - click.secho( - f"Match completed successfully. Output saved to: {output_path}", fg="green" - ) + if generate_xml: + if not results_map: + click.secho("No valid records found to generate XML.", fg="yellow") + return + + xml_output_folder = os.path.join(output_path, "xml_exports") + os.makedirs(xml_output_folder, exist_ok=True) + + exporter = XMLExporter(output_path=xml_output_folder) + report = exporter.generate_batch(results_map) + + click.secho(f"✅ XMLs generated in: {xml_output_folder}", fg="green") + + if upload_cernbox and report: + _handle_cernbox_upload(report, cernbox_path) + + except Exception as e: + click.secho(f"Critical Error: {e}", fg="red", err=True) + + +def _handle_cernbox_upload(report, remote_path): + try: + cernbox = CernboxProvider() + files = report.get("files", []).copy() + if report.get("combined"): + files.append(report["combined"]) + + for local_file in files: + file_name = Path(local_file).name + target = f"{remote_path.strip('/')}/{file_name}" + cernbox.upload_file(local_file_path=local_file, remote_file_path=target) + click.echo(f" -> Uploaded: {file_name}") + + click.secho("CERNBox sync complete.", fg="green") except Exception as e: - click.secho(f"Error during matching: {e}", fg="red", err=True) + click.secho(f"CERNBox Error: Failed to process '{file_name}'. Details: {e}", fg="red", err=True) if __name__ == "__main__": diff --git a/refactory/file_import/boite_matcher.py b/refactory/file_import/boite_matcher.py index b9954d2..0a63879 100644 --- a/refactory/file_import/boite_matcher.py +++ b/refactory/file_import/boite_matcher.py @@ -23,7 +23,7 @@ def __init__( """Initializes the matcher with storage, data output, data path, and target file types.""" self.provider = provider self.base_path = Path(base_path) - self.output_path = Path(output_path) + self.output_path = Path(output_path)/'logs' self.output_path.mkdir(parents=True, exist_ok=True) self.file_types = file_types or ["PDF", "PDF_LATEX"] self.data_path = self._prepare_data_path(data_source) diff --git a/refactory/file_import/xml_exporter.py b/refactory/file_import/xml_exporter.py new file mode 100644 index 0000000..76233f9 --- /dev/null +++ b/refactory/file_import/xml_exporter.py @@ -0,0 +1,90 @@ +import xml.etree.ElementTree as ET +from xml.dom import minidom +import tempfile +from pathlib import Path + + +class XMLExporter: + """Handles XML generation and file persistence logic.""" + + def __init__(self, output_path: str | None = None): + + if output_path: + self.base_dir = Path(output_path) + self.base_dir.mkdir(parents=True, exist_ok=True) + else: + self.base_dir = Path(tempfile.mkdtemp(prefix="boite_xmls_")) + + def _build_record_element(self, root: ET.Element, record: dict) -> None: + record_node = ET.SubElement(root, "record") + ET.SubElement(record_node, "controlfield", tag="001").text = str( + record.get("record_id", "") + ) + + if record.get("pdf_url"): + df = ET.SubElement(record_node, "datafield", tag="FFT", ind1=" ", ind2=" ") + ET.SubElement(df, "subfield", code="a").text = record["pdf_url"] + ET.SubElement(df, "subfield", code="t").text = "Main" + ET.SubElement(df, "subfield", code="d").text = "Fulltext PDF" + + if record.get("pdf_latex_url"): + df = ET.SubElement(record_node, "datafield", tag="FFT", ind1=" ", ind2=" ") + ET.SubElement(df, "subfield", code="a").text = record["pdf_latex_url"] + ET.SubElement(df, "subfield", code="t").text = "Main" + ET.SubElement(df, "subfield", code="d").text = "Fulltext PDF_LaTeX" + + def _save_to_disk(self, root: ET.Element, filename: str) -> str: + """Converts element tree to XML file.""" + rough_string = ET.tostring(root, encoding="utf-8") + pretty_xml = minidom.parseString(rough_string).toprettyxml(indent=" ") + + + file_path = self.base_dir / filename + file_path.write_text(pretty_xml, encoding="utf-8") + return str(file_path) + + def generate_single(self, records: list[dict], filename: str) -> str | None: + """Generates XML file for Boite file.""" + root = ET.Element("collection") + valid_records_count = 0 + + for rec in records: + if not rec.get("pdf_url") and not rec.get("pdf_latex_url"): + continue + + self._build_record_element(root, rec) + valid_records_count += 1 + + if valid_records_count == 0: + print(f" {filename} Skipped: No valid files found.") + return None + + return self._save_to_disk(root, filename) + + def generate_batch(self, results_map: dict[str, list[dict]]) -> dict: + """Batch generates individual XMLs and a combined output from boite files.""" + output_report={ + "output_path": str(self.base_dir), + "files":[], + "combined":None + } + + all_records_combined = [] + + for boite_file, records in results_map.items(): + if not records: + continue + + xml_name = str(Path(boite_file).with_suffix(".xml")) + saved_file_path = self.generate_single(records, xml_name) + + if saved_file_path: + output_report["files"].append(saved_file_path) + all_records_combined.extend(records) + + if all_records_combined: + output_report["combined"] = self.generate_single( + all_records_combined, "Boites_combined.xml" + ) + + return output_report diff --git a/refactory/storage_connection.py b/refactory/storage_connection.py index 074b6f9..e169ab7 100644 --- a/refactory/storage_connection.py +++ b/refactory/storage_connection.py @@ -130,11 +130,24 @@ def __init__(self, public_link_hash: str = None): ) self.auth = (self.account, self.password) + def _build_eos_path(self, path: str) -> str: + + clean_path = path.lstrip("/") + + if clean_path.startswith("eos/"): + return clean_path + + if self.account and not self.is_public: + initial = self.account[0].lower() + return f"eos/user/{initial}/{self.account}/{clean_path}" + + return clean_path + def _propfind(self, path: str, depth: str = "1") -> list[str]: + eos_path = self._build_eos_path(path) + url = f"{self.base_url}/{eos_path}/" if eos_path else f"{self.base_url}/" - url = f"{self.base_url}/{path}/" if path else f"{self.base_url}/" headers = {"Depth": depth} - response = requests.request("PROPFIND", url, headers=headers, auth=self.auth) response.raise_for_status() @@ -145,7 +158,6 @@ def _propfind(self, path: str, depth: str = "1") -> list[str]: for response_tag in root.findall("d:response", namespaces)[1:]: href = response_tag.find("d:href", namespaces).text filename = href.rstrip("/").split("/")[-1] - paths.append(filename) return paths @@ -160,7 +172,8 @@ def list_files(self, folder_path: str, extension: str = None) -> list[str]: return all_items def download_to_temp(self, file_path: str, temp_file_path: str) -> None: - url = f"{self.base_url}/{file_path}" + eos_path = self._build_eos_path(file_path) + url = f"{self.base_url}/{eos_path}" response = requests.get(url, stream=True, auth=self.auth) response.raise_for_status() @@ -168,18 +181,49 @@ def download_to_temp(self, file_path: str, temp_file_path: str) -> None: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) + def create_folder(self, folder_path: str) -> None: + + if self.is_public or not self.account or not self.password: + raise ValueError("Error: CERN credentials required to create folders.") + + eos_path = self._build_eos_path(folder_path) + url = f"{self.base_url}/{eos_path}/" + + response = requests.request("MKCOL", url, auth=self.auth) + + if response.status_code not in (201, 405): + + response.raise_for_status() + def upload_file(self, local_file_path: str, remote_file_path: str) -> None: - if self.is_public: - raise NotImplementedError("Error: CERN credentials required for updates.") + """Faz o upload. Se a pasta não existir, o servidor retornará 409.""" + if self.is_public or not self.account or not self.password: + raise ValueError( + "Error: CERN account and password are required for uploading." + ) - clean_remote_path = remote_file_path.strip("/") - url = f"{self.base_url}/{clean_remote_path}" + eos_path = self._build_eos_path(remote_file_path) + url = f"{self.base_url}/{eos_path}" with open(local_file_path, "rb") as f: response = requests.put(url, data=f, auth=self.auth) + if response.status_code == 409: + clean_remote_path = remote_file_path.strip("/") + parent_dir = "/".join(clean_remote_path.split("/")[:-1]) + + if parent_dir: + self.create_folder(parent_dir) + + with open(local_file_path, "rb") as retry_f: + retry_response = requests.put(url, data=retry_f, auth=self.auth) + + retry_response.raise_for_status() + return + response.raise_for_status() def generate_presigned_url( self, file_key: str, content_type: str = None, expiration: int = None ) -> str: - return f"{self.base_url}/{file_key}" + eos_path = self._build_eos_path(file_key) + return f"{self.base_url}/{eos_path}" From 0b154273eaaeda125fa1c235615ebf40c29378b6 Mon Sep 17 00:00:00 2001 From: Gabriela de Santana Carrara Date: Thu, 7 May 2026 11:16:02 +0200 Subject: [PATCH 2/2] digitization(matcher): Enhance file matching logic and address PR feedback * Support multiple S3 roots, priorities, and extended file types * Add dry-run mode, run summary metrics, and detailed reporting * Incorporate fixes and improvements from previous PR review * ref cern-sis/digitization#21 --- refactory/README.md | 25 ++-- refactory/cli.py | 82 +++++++++++-- refactory/file_import/boite_matcher.py | 161 +++++++++++++++---------- refactory/file_import/xml_exporter.py | 36 ++++-- refactory/storage_connection.py | 3 - refactory/test_connections.py | 60 --------- 6 files changed, 206 insertions(+), 161 deletions(-) delete mode 100644 refactory/test_connections.py diff --git a/refactory/README.md b/refactory/README.md index e7b8cda..2576b4f 100644 --- a/refactory/README.md +++ b/refactory/README.md @@ -57,11 +57,12 @@ Use this command to match Boite Excel filenames with S3 objects, write structure ```bash poetry run digitization_v2 match-and-export \ - -d "[https://cernbox.cern.ch/s/](https://cernbox.cern.ch/s/){hash}" \ - -p "cern-archives/raw/" \ + -d "https://cernbox.cern.ch/s/{hash}" \ + -p "cern-archives/raw/CORRECTIONS_2,cern-archives/raw/" \ -o ./results \ - -f PDF,PDF_LATEX \ + -f PDF, PDF_LATEX \ -b digitization-dev \ + -r \ -x \ -c ``` @@ -69,10 +70,12 @@ poetry run digitization_v2 match-and-export \ **Options:** - `-d, --data-source` — local directory or CERNBox URL containing `.xlsx` Boite files. -- `-p, --base-path` — Base S3 path (default: `cern-archives/raw/`). +- `-p, --base-paths` — Comma-separated base S3 paths. Order defines priority (e.g., `CORRECTIONS_2` overrides standard `raw` folders) (default: `cern-archives/raw/`). - `-o, --output-path` — output directory for JSON/XML results (default: `./results`). - `-f, --file-types` — comma-separated list of file types to match (default: `PDF,PDF_LATEX`). - `-b, --bucket` — S3 bucket name (default: `digitization-dev`). +- `-r, --report` — Display detailed run summary metrics (Total Matched/Unmatched) and listed missing records in the console. +- `--dry-run` — Stop script execution after the matching phase. No XML generation or uploads will occur. - `-x, --generate-xml` — Generate XML files (FFT) for CDS upload. - `-c, --upload-cernbox` — Upload the generated XML files to CERNBox. - `--cernbox-path` — Target folder inside CERNBox for XML uploads (default: `xml_exports`). @@ -83,12 +86,13 @@ The `match-and-export` flow: 1. **Downloads** `.xlsx` Boite files from CERNBox if a URL is provided. 2. **Reads** each Boite file and extracts the record ID and filename columns. -3. **Searches** S3 under `//`. -4. **Matches** filenames case-insensitively. Supports both flat and subfolder layouts: - - *Flat:* `raw/PDF_LATEX/BOITE_O0125/ISR-LEP-RF-GG-ps.pdf` - - *Nested:* `raw/PDF/BOITE_O0125/LEP-RF-SH-ps/LEP-RF-SH-ps.pdf` -5. **Generates** unified mismatch logs in JSON format for missing Boite rows and extra S3 files. -6. **(Optional) Exports** matching records to XML files if the `-x` flag is used. +3. **Searches** S3 under `///`. If multiple base paths are provided, it respects **priority mapping** (preventing duplicates by prioritizing earlier paths). +4. **Matches** filenames case-insensitively. Supports: + - *Flat layouts:* `raw/PDF_LATEX/BOITE_O0125/ISR-LEP-RF-GG-ps.pdf` + - *Nested subfolders:* `raw/PDF/BOITE_O0125/LEP-RF-SH-ps/LEP-RF-SH-ps.pdf` + - *Multi-page grouping:* Automatically groups multiple files (e.g., sequential TIFFs like `_001`, `_002`) under a single record ID dynamically. +5. **Generates** unified mismatch logs in JSON format for missing Boite rows, extra S3 files, and calculates match/unmatch metrics per file. +6. **(Optional) Exports** matching records to XML files if the `-x` flag is used. Generates XML `` nodes dynamically based on all resolved file types (PDFs, TIFFs, OCRs). 7. **(Optional) Uploads** the generated XMLs to a specified path in CERNBox if the `-c` flag is used. --- @@ -155,5 +159,4 @@ export CERNBOX_PASSWORD="your_password" ## Notes - `file_import/boite_matcher.py` is the primary matcher used by `match-and-export`. -- `test_connections.py` can be used to verify storage connectivity before running either workflow. - Use `poetry run digitization_v2 --help` to verify command names and options at runtime. diff --git a/refactory/cli.py b/refactory/cli.py index a663624..6764908 100644 --- a/refactory/cli.py +++ b/refactory/cli.py @@ -39,6 +39,7 @@ def parse_inventory(value): def digitization_v2(): pass + @digitization_v2.command("validate-files-integrity") @click.option( "-d", @@ -90,10 +91,10 @@ def validate_files_integrity(data_source, base_path, bucket, upload_reports): ) @click.option( "-p", - "--base-path", + "--base-paths", default="cern-archives/raw/", show_default=True, - help="Base S3 path.", + help="Comma-separated base S3 paths (e.g., cern-archives/raw/CORRECTIONS_2,cern-archives/raw/).", ) @click.option( "-o", @@ -131,21 +132,34 @@ def validate_files_integrity(data_source, base_path, bucket, upload_reports): show_default=True, help="Target folder inside CERNBox.", ) +@click.option( + "--dry-run", + is_flag=True, + help="Stop script execution after matching. No XML or uploads will occur.", +) +@click.option( + "-r", + "--report", + is_flag=True, + help="Display detailed summary metrics and listed files in the console.", +) def match_and_export( data_source, - base_path, + base_paths, output_path, file_types, bucket, generate_xml, upload_cernbox, cernbox_path, + dry_run, + report, ): """Matches Excel records against S3 and optionally exports to XML/CERNBox.""" - os.makedirs(output_path, exist_ok=True) provider = S3Provider(bucket=bucket) + parsed_base_paths = [p.strip() for p in base_paths.split(",")] parsed_file_types = [t.strip() for t in file_types.split(",")] click.echo(f"Starting process for: {data_source}") @@ -153,14 +167,54 @@ def match_and_export( try: matcher = BoiteS3Matcher( provider=provider, - base_path=base_path, + base_paths=parsed_base_paths, data_source=data_source, output_path=output_path, file_types=parsed_file_types, ) - results_map = matcher.execute() - click.secho(f"Match completed. Results in: {output_path}", fg="green") + results_map, all_mismatches = matcher.execute() + + total_records = sum(m["metrics"]["total_records"] for m in all_mismatches) + total_matched = sum(m["metrics"]["total_matched"] for m in all_mismatches) + total_unmatched = sum(m["metrics"]["total_unmatched"] for m in all_mismatches) + + if report: + click.secho("\n=== RUN SUMMARY METRICS ===", fg="cyan", bold=True) + click.echo(f"Total Records Processed : {total_records}") + click.secho(f"Total Matched : {total_matched}", fg="green") + click.secho(f"Total Unmatched : {total_unmatched}", fg="red") + + click.secho("\n--- Detailed File Matches ---", fg="cyan", bold=True) + for box_data in all_mismatches: + box_file = box_data["boite_file"] + mets = box_data["metrics"] + + click.echo( + f"📦 {box_file}: {mets['total_matched']} matched | {mets['total_unmatched']} unmatched" + ) + + if box_data["mismatches"]["in_boite_missing_in_s3"]: + click.secho(" [Missing Records]", fg="yellow") + for missing in box_data["mismatches"]["in_boite_missing_in_s3"]: + missing_types = ", ".join(missing["missing_types"]) + click.echo( + f" - ID: {missing['record_id']} (Missing: {missing_types})" + ) + click.echo("===========================\n") + else: + click.secho( + f"Match completed. [Matched: {total_matched} | Unmatched: {total_unmatched}]", + fg="green", + ) + + if dry_run: + click.secho( + "⚠️ Dry-run mode active. Stopping execution before XML generation.", + fg="yellow", + bold=True, + ) + return if generate_xml: if not results_map: @@ -171,12 +225,12 @@ def match_and_export( os.makedirs(xml_output_folder, exist_ok=True) exporter = XMLExporter(output_path=xml_output_folder) - report = exporter.generate_batch(results_map) + report_data = exporter.generate_batch(results_map) click.secho(f"✅ XMLs generated in: {xml_output_folder}", fg="green") - if upload_cernbox and report: - _handle_cernbox_upload(report, cernbox_path) + if upload_cernbox and report_data: + _handle_cernbox_upload(report_data, cernbox_path) except Exception as e: click.secho(f"Critical Error: {e}", fg="red", err=True) @@ -184,8 +238,10 @@ def match_and_export( def _handle_cernbox_upload(report, remote_path): try: + file_name = None cernbox = CernboxProvider() files = report.get("files", []).copy() + if report.get("combined"): files.append(report["combined"]) @@ -197,7 +253,11 @@ def _handle_cernbox_upload(report, remote_path): click.secho("CERNBox sync complete.", fg="green") except Exception as e: - click.secho(f"CERNBox Error: Failed to process '{file_name}'. Details: {e}", fg="red", err=True) + click.secho( + f"CERNBox Error: Failed to process '{file_name}'. Details: {e}", + fg="red", + err=True, + ) if __name__ == "__main__": diff --git a/refactory/file_import/boite_matcher.py b/refactory/file_import/boite_matcher.py index 0a63879..40960be 100644 --- a/refactory/file_import/boite_matcher.py +++ b/refactory/file_import/boite_matcher.py @@ -10,91 +10,106 @@ class BoiteS3Matcher: - """Matches Boite Excel records with S3 files and logs discrepancies.""" - def __init__( self, provider: StorageProvider, - base_path: str, + base_paths: list[str] | str, data_source: str, output_path: str, file_types: list[str] | None = None, ): - """Initializes the matcher with storage, data output, data path, and target file types.""" self.provider = provider - self.base_path = Path(base_path) - self.output_path = Path(output_path)/'logs' + self.base_paths = base_paths if isinstance(base_paths, list) else [base_paths] + self.output_path = Path(output_path) / "logs" self.output_path.mkdir(parents=True, exist_ok=True) - self.file_types = file_types or ["PDF", "PDF_LATEX"] + self.file_types = file_types or [ + "PDF", + "PDF_LATEX" + ] self.data_path = self._prepare_data_path(data_source) def _is_url(self, value: str) -> bool: return urlparse(value).scheme in {"http", "https"} def _prepare_data_path(self, data_source: str) -> Path: - """Returns the local path or delegates the download if a URL is provided.""" if self._is_url(data_source): return Path(fetch_boite_files(data_source)) return Path(data_source) - def _get_base_filename(self, filename: str) -> str: - """Strips file extensions and returns a clean, lowercase base name for exact matching.""" + def _get_base_filename(self, filename: str, ftype: str = "") -> str: lower_name = filename.lower() if lower_name.endswith("_latex.pdf"): return lower_name[:-10] - if lower_name.endswith((".pdf",".tiff", ".tif")): - return lower_name.rsplit(".", 1)[0] + if "." in lower_name: + lower_name = lower_name.rsplit(".", 1)[0] + + if ftype == "TIFF": + lower_name = re.sub(r"_\d{1,4}$", "", lower_name) + return lower_name def _normalize_for_comparison(self, name: str) -> str: - """Removes all non-alphanumeric characters for fuzzy matching and review suggestions.""" return re.sub(r"[^a-z0-9]", "", name.lower()) def _load_s3_cache_for_boite( self, box_file: str - ) -> tuple[dict[str, dict[str, str]], dict[str, set[str]]]: - """Pre-loads and filters S3 keys for match""" - cache: dict[str, dict[str, str]] = {} - available_keys: dict[str, set[str]] = {} + ) -> tuple[dict[str, dict[str, list[str]]], dict[str, set[str]]]: + cache: dict[str, dict[str, list[str]]] = {ft: {} for ft in self.file_types} + available_keys: dict[str, set[str]] = {ft: set() for ft in self.file_types} + mapped_roots: dict[str, dict[str, str]] = {ft: {} for ft in self.file_types} folder_pattern = re.compile(r"(?i:BOITE)[\-_]O0(\d+)(?:[\-_]\w+)?") match = folder_pattern.search(box_file) if not match: - print('No Boile file found.') - return {ft: {} for ft in self.file_types}, { - ft: set() for ft in self.file_types - } + return cache, available_keys target_number = match.group(1) for filetype in self.file_types: - prefix = f"{self.base_path}/{filetype}/BOITE_O0{target_number}" - all_raw_keys = self.provider.list_files(prefix) - - valid_keys: list[str] = [] - - for key in all_raw_keys: - if key.endswith("/"): - continue - - s3_match = folder_pattern.search(key) - - if s3_match and s3_match.group(1) == target_number: - valid_keys.append(key) - - cache[filetype] = { - self._get_base_filename(k.split("/")[-1]): k for k in valid_keys - } - available_keys[filetype] = set(valid_keys) + for base_path in self.base_paths: + prefix = f"{base_path}/{filetype}/BOITE_O0{target_number}".replace( + "\\", "/" + ).replace("//", "/") + all_raw_keys = self.provider.list_files(prefix) + + valid_keys: list[str] = [] + + for key in all_raw_keys: + if key.endswith("/"): + continue + + s3_match = folder_pattern.search(key) + if s3_match and s3_match.group(1) == target_number: + valid_keys.append(key) + available_keys[filetype].add(key) + + for key in valid_keys: + parts = key.split("/") + base_filename = self._get_base_filename(parts[-1], filetype) + + if base_filename not in cache[filetype]: + cache[filetype][base_filename] = [key] + mapped_roots[filetype][base_filename] = base_path + elif mapped_roots[filetype][base_filename] == base_path: + if key not in cache[filetype][base_filename]: + cache[filetype][base_filename].append(key) + + for key in valid_keys: + parts = key.split("/") + if len(parts) > 1: + folder_name = self._get_base_filename(parts[-2], filetype) + if "boite" not in folder_name: + if folder_name not in cache[filetype]: + cache[filetype][folder_name] = [key] + mapped_roots[filetype][folder_name] = base_path + elif mapped_roots[filetype][folder_name] == base_path: + if key not in cache[filetype][folder_name]: + cache[filetype][folder_name].append(key) return cache, available_keys - def process_boite( - self, box_file: str - ) -> tuple[list[dict], dict]: - """Processes a single Boite file in-memory and returns the mapped records alongside mismatch data.""" - print(f"📦 Processing {box_file}...") + def process_boite(self, box_file: str) -> tuple[list[dict], dict]: df = pd.read_excel(self.data_path / box_file, header=None) boite_name_s3 = transform_box_file_name(box_file) @@ -112,22 +127,33 @@ def process_boite( missing_types: list[str] = [] for ftype in self.file_types: - url_key = f"{ftype.lower()}_url" - matched_key = s3_cache[ftype].get(search_name) - - if matched_key: - content_type = ( - "application/pdf" if ftype in ["PDF", "PDF_LATEX"] else None - ) - record_data[url_key] = self.provider.generate_presigned_url( - matched_key, ftype, content_type - ) - used_s3_keys[ftype].add(matched_key) + matched_keys = s3_cache[ftype].get(search_name) + + if matched_keys: + matched_keys = sorted(matched_keys) + + if ftype == "TIFF": + for i, m_key in enumerate(matched_keys, start=1): + url_key = f"{ftype.lower()}_{i:03d}_url" + record_data[url_key] = self.provider.generate_presigned_url( + m_key, ftype, None + ) + else: + url_key = f"{ftype.lower()}_url" + content_type = ( + "application/pdf" if ftype in ["PDF", "PDF_LATEX"] else None + ) + record_data[url_key] = self.provider.generate_presigned_url( + matched_keys[0], ftype, content_type + ) + + used_s3_keys[ftype].update(matched_keys) else: + url_key = f"{ftype.lower()}_url" record_data[url_key] = None missing_types.append(ftype) - if missing_types: + if len(missing_types) == len(self.file_types): missing_in_s3.append( { "record_id": record_id, @@ -152,27 +178,35 @@ def process_boite( for s3_key in unused_s3: parts = s3_key.split("/") - s3_base = self._get_base_filename(parts[-1]) + s3_base = self._get_base_filename(parts[-1], ftype) s3_norm = self._normalize_for_comparison(s3_base) folder_norm = "" - if ftype == "PDF" and len(parts) > 1: + if len(parts) > 1: folder_norm = self._normalize_for_comparison(parts[-2]) - if boite_norm == s3_norm or (ftype == "PDF" and boite_norm == folder_norm): + if boite_norm == s3_norm or boite_norm == folder_norm: near_matches.append( { "boite_record": missing_rec["record_name"], "suggested_s3_key": s3_key, - "filetype": ftype + "filetype": ftype, } ) + total_records = len(records_data) + total_unmatched = len(missing_in_s3) + total_matched = total_records - total_unmatched + mismatch_data = { "boite_file": box_file, "s3_folder_name": boite_name_s3, - "total_in_boite_missing_in_s3": len(missing_in_boite), + "metrics": { + "total_records": total_records, + "total_matched": total_matched, + "total_unmatched": total_unmatched, + }, "mismatches": { "in_boite_missing_in_s3": missing_in_s3, "in_s3_missing_in_boite": missing_in_boite, @@ -183,7 +217,6 @@ def process_boite( return records_data, mismatch_data def _export_records(self, box_file: str, records: list) -> None: - """Saves Boite records to JSON.""" base_name = box_file.rsplit(".", 1)[0] with open( self.output_path / f"{base_name}_records.json", "w", encoding="utf-8" @@ -191,7 +224,6 @@ def _export_records(self, box_file: str, records: list) -> None: json.dump(records, f, indent=4, ensure_ascii=False) def _export_unified_log(self, all_mismatches: list) -> None: - """Saves consolidated mismatch log.""" with open( self.output_path / "all_boites_mismatches.json", "w", encoding="utf-8" ) as f: @@ -203,7 +235,6 @@ def _export_unified_log(self, all_mismatches: list) -> None: ) def execute(self) -> dict[str, list[dict]]: - """Export logs in Json and return records data in memory""" results_map, all_mismatches = {}, [] for box_file in os.listdir(self.data_path): if box_file.lower().endswith(".xlsx") and not box_file.startswith("~"): @@ -213,4 +244,4 @@ def execute(self) -> dict[str, list[dict]]: self._export_records(box_file, records) self._export_unified_log(all_mismatches) - return results_map + return results_map, all_mismatches diff --git a/refactory/file_import/xml_exporter.py b/refactory/file_import/xml_exporter.py index 76233f9..e119012 100644 --- a/refactory/file_import/xml_exporter.py +++ b/refactory/file_import/xml_exporter.py @@ -1,3 +1,4 @@ +import re import xml.etree.ElementTree as ET from xml.dom import minidom import tempfile @@ -15,23 +16,34 @@ def __init__(self, output_path: str | None = None): else: self.base_dir = Path(tempfile.mkdtemp(prefix="boite_xmls_")) + def _get_description_for_type(self, url_key: str) -> str: + base_type = url_key.replace("_url", "") + clean_type = re.sub(r"_\d{3,4}$", "", base_type) + + type_mapping = { + "pdf": "Fulltext PDF", + "pdf_latex": "Fulltext PDF_LaTeX", + "pdf_ocr": "Fulltext PDF_OCR", + "pdf_transmis": "Fulltext PDF_TRANSMIS", + "tiff": "Fulltext TIFF", + } + + return type_mapping.get(clean_type, f"Fulltext {clean_type.upper()}") + def _build_record_element(self, root: ET.Element, record: dict) -> None: record_node = ET.SubElement(root, "record") ET.SubElement(record_node, "controlfield", tag="001").text = str( record.get("record_id", "") ) - if record.get("pdf_url"): - df = ET.SubElement(record_node, "datafield", tag="FFT", ind1=" ", ind2=" ") - ET.SubElement(df, "subfield", code="a").text = record["pdf_url"] - ET.SubElement(df, "subfield", code="t").text = "Main" - ET.SubElement(df, "subfield", code="d").text = "Fulltext PDF" + for key, value in record.items(): + if key.endswith("_url") and value: + description = self._get_description_for_type(key) - if record.get("pdf_latex_url"): - df = ET.SubElement(record_node, "datafield", tag="FFT", ind1=" ", ind2=" ") - ET.SubElement(df, "subfield", code="a").text = record["pdf_latex_url"] - ET.SubElement(df, "subfield", code="t").text = "Main" - ET.SubElement(df, "subfield", code="d").text = "Fulltext PDF_LaTeX" + df = ET.SubElement(record_node, "datafield", tag="FFT", ind1=" ", ind2=" ") + ET.SubElement(df, "subfield", code="a").text = value + ET.SubElement(df, "subfield", code="t").text = "Main" + ET.SubElement(df, "subfield", code="d").text = description def _save_to_disk(self, root: ET.Element, filename: str) -> str: """Converts element tree to XML file.""" @@ -49,7 +61,9 @@ def generate_single(self, records: list[dict], filename: str) -> str | None: valid_records_count = 0 for rec in records: - if not rec.get("pdf_url") and not rec.get("pdf_latex_url"): + has_valid_url = any(key.endswith("_url") and val for key, val in rec.items()) + + if not has_valid_url: continue self._build_record_element(root, rec) diff --git a/refactory/storage_connection.py b/refactory/storage_connection.py index e169ab7..b52ae0a 100644 --- a/refactory/storage_connection.py +++ b/refactory/storage_connection.py @@ -41,7 +41,6 @@ def __init__( self.bucket = bucket if os.environ["ACCESS_KEY"] and os.environ["SECRET_KEY"]: - print("Logging into s3 using credentials provided in enviroment variables") self.s3 = boto3.client( "s3", aws_access_key_id=os.environ["ACCESS_KEY"], @@ -49,7 +48,6 @@ def __init__( endpoint_url=endpoint_url, ) else: - print("Using default s3 login without credentials") self.s3 = boto3.client( "s3", endpoint_url=endpoint_url, @@ -196,7 +194,6 @@ def create_folder(self, folder_path: str) -> None: response.raise_for_status() def upload_file(self, local_file_path: str, remote_file_path: str) -> None: - """Faz o upload. Se a pasta não existir, o servidor retornará 409.""" if self.is_public or not self.account or not self.password: raise ValueError( "Error: CERN account and password are required for uploading." diff --git a/refactory/test_connections.py b/refactory/test_connections.py deleted file mode 100644 index 65467ce..0000000 --- a/refactory/test_connections.py +++ /dev/null @@ -1,60 +0,0 @@ -import tempfile -import os -from .storage_connection import S3Provider, CernboxProvider - -def test_s3(): - print("--- Testing AWS S3 connection ---") - try: - s3 = S3Provider(bucket="digitization-dev") - base_path = "cern-archives/raw/PDF/" - - folders = s3.list_folders(base_path) - print("✅ Read: Success! Connected to S3.") - print(f"Found {len(folders)} folders in '{base_path}'.") - - except Exception as e: - print("❌ Failed to connect/operate on S3.") - print(f"Details: {e}") - -def test_cernbox(): - print("\n--- Testing CERNBOX connection (Hybrid Mode) ---") - - # 1. Read Variables (Public) - public_hash = "QslvWRIPsBcDAOK" - read_base_path = "" # Relative path inside the public link - - # 2. Write Variables (Private/Authenticated) - cern_user = "gadesant" # CERN username - cern_password = os.environ.get("CERNBOX_PASSWORD") - write_base_path = "eos/user/g/gadesant/teste/"#"eos/user/{u}/{user}/teste/" - - if public_hash == "PUT_YOUR_PUBLIC_HASH_HERE": - print("Warning: Configure the public_hash in the code before testing.") - return - - if not cern_password: - print("❌ The CERNBOX_PASSWORD environment variable is not set.") - print("Run in terminal: export CERNBOX_PASSWORD='your_password'") - return - - try: - # Passing all three arguments - cernbox = CernboxProvider(public_link_hash=public_hash, account=cern_user, password=cern_password) - - print("\n[Phase 1: Reading from Public Link]") - folders = cernbox.list_folders(read_base_path) - print("✅ Read: Success (Anonymous)!") - print(f"Found {len(folders)} items at the root of the link.") - - print("\n[Phase 2: Writing via Authenticated WebDAV]") - - with tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='utf-8') as tmp: - tmp.write("Authenticated upload from test_connections.py") - - except Exception as e: - print("❌ Failed to connect/operate on CERNBOX.") - print(f"Details: {e}") - -if __name__ == "__main__": - test_s3() - # test_cernbox()