From 1e399e7efb7a088707105c7da1b76c5875af3853 Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Wed, 21 Jan 2026 00:19:32 -0600 Subject: [PATCH 01/12] Update Read the Docs configuration to use requirements file for Sphinx installation --- .readthedocs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.readthedocs.yml b/.readthedocs.yml index 4d67ad5..018cd19 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -14,7 +14,7 @@ python: install: - method: pip path: . - extra: docs + - requirements: docs/requirements.txt sphinx: configuration: docs/conf.py From 32a877a2d53ff8452500d377df0da210eb02ecd9 Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Wed, 21 Jan 2026 00:20:55 -0600 Subject: [PATCH 02/12] Add support for ShadowInstances and rerun failed records functionality - Introduced ShadowInstances as a new object type for consortium shadow copies. - Enhanced BatchPoster to handle rerunning failed records one at a time. - Updated documentation to reflect new features and usage instructions. - Added unit tests for ShadowInstances and rerun functionality. --- docs/source/batch_poster_guide.md | 81 ++++- docs/source/concepts.md | 38 ++ src/folio_data_import/BatchPoster.py | 230 +++++++++++- tests/test_batch_poster.py | 514 ++++++++++++++++++++++++++- 4 files changed, 856 insertions(+), 7 deletions(-) diff --git a/docs/source/batch_poster_guide.md b/docs/source/batch_poster_guide.md index 1ad5005..c7612e1 100644 --- a/docs/source/batch_poster_guide.md +++ b/docs/source/batch_poster_guide.md @@ -21,6 +21,7 @@ Batch Poster works exclusively with FOLIO Inventory storage records: | `Instances` | `/instance-storage/batch/synchronous` | Bibliographic records | | `Holdings` | `/holdings-storage/batch/synchronous` | Holdings records | | `Items` | `/item-storage/batch/synchronous` | Item records | +| `ShadowInstances` | `/instance-storage/batch/synchronous` | Consortium shadow instances (ECS) | ```{note} For other data types, use the appropriate tool: @@ -69,11 +70,12 @@ folio-data-import batch-poster \ | Parameter | Default | Description | |-----------|---------|-------------| -| `--object-type` | (required) | Record type: `Instances`, `Holdings`, or `Items` | +| `--object-type` | (required) | Record type: `Instances`, `Holdings`, `Items`, or `ShadowInstances` | | `--file-path` | (required) | Path(s) to JSONL file(s). Accepts multiple values and glob patterns | | `--batch-size` | 100 | Number of records per batch (1-1000) | | `--upsert` | false | Enable upsert mode to update existing records | | `--failed-records-file` | none | Path to file for writing failed records | +| `--rerun-failed-records` | false | After main run, reprocess failed records one at a time | | `--no-progress` | false | Disable progress bar display | #### Upsert Options @@ -183,6 +185,40 @@ folio-data-import batch-poster \ This updates **only** `barcode` and `materialTypeId` from your input file while preserving all other fields from the existing record. +## Protected Fields + +### Always-Preserved Fields + +Certain fields are **always preserved** from existing records during upsert, regardless of configuration: + +| Field | Applies To | Reason | +|-------|------------|--------| +| `hrid` | All record types | Human-readable ID - changing it breaks external references | +| `lastCheckIn` | Items | Circulation data - should not be overwritten by migrations | + +These fields cannot be overwritten through upsert operations. If you need to change an `hrid`, you must delete and recreate the record. + +### MARC-Sourced Instance Protection + +When updating Instance records that have a MARC source (i.e., `source` contains "MARC"), Batch Poster automatically restricts which fields can be patched. This protects MARC-managed fields from being overwritten, as they would be reverted on the next SRS update anyway. + +**Allowed fields for MARC-sourced Instances:** + +| Field | Purpose | +|-------|---------| +| `discoverySuppress` | Discovery suppression flag | +| `staffSuppress` | Staff suppression flag | +| `deleted` | Deletion flag | +| `statisticalCodeIds` | Statistical codes (merged with existing) | +| `administrativeNotes` | Administrative notes (merged with existing) | +| `instanceStatusId` | Instance status | + +**Example:** If you try to update the `title` of a MARC-sourced Instance, that change will be ignored to protect the MARC-managed data. + +```{note} +This protection is automatic. You don't need to configure anything - MARC-sourced records are detected and handled appropriately. +``` + ## Multiple Files Process multiple files in one run: @@ -308,6 +344,49 @@ folio-data-import batch-poster \ --patch-paths "barcode" ``` +### Rerun Failed Records + +When a batch fails, some records in that batch may have succeeded individually. The `--rerun-failed-records` flag automatically reprocesses failed records one at a time after the main run completes, giving each record a second chance: + +```bash +folio-data-import batch-poster \ + --object-type Items \ + --file-path items.jsonl \ + --upsert \ + --failed-records-file failed_items.jsonl \ + --rerun-failed-records +``` + +This will: +1. Process all records in batches (normal operation) +2. If any batches fail, reprocess those failed records individually +3. Write still-failing records to a new file with `_rerun` suffix (e.g., `failed_items_rerun.jsonl`) + +The original failed records file is preserved, and the rerun processes records in a streaming fashion without loading them all into memory. + +```{note} +`--rerun-failed-records` requires `--failed-records-file` to be set. +``` + +### Consortium Shadow Instances (ECS) + +For FOLIO ECS (consortium) environments, use `ShadowInstances` to post shadow copies to member tenants. This automatically converts the `source` field: + +- `MARC` → `CONSORTIUM-MARC` +- `FOLIO` → `CONSORTIUM-FOLIO` + +```bash +folio-data-import batch-poster \ + --gateway-url https://folio-snapshot-okapi.dev.folio.org \ + --tenant-id central \ + --member-tenant-id member1 \ + --username admin \ + --password admin \ + --object-type ShadowInstances \ + --file-path instances.jsonl \ + --upsert +``` + ## Environment Variables Set connection parameters as environment variables to simplify commands: diff --git a/docs/source/concepts.md b/docs/source/concepts.md index 0562806..db5703f 100644 --- a/docs/source/concepts.md +++ b/docs/source/concepts.md @@ -133,6 +133,7 @@ Batch Poster works with FOLIO Inventory storage records: - **Instances**: Bibliographic records - **Holdings**: Holdings records attached to instances - **Items**: Item records attached to holdings +- **ShadowInstances**: Consortium shadow copies (ECS environments) ### Input Format @@ -171,6 +172,22 @@ When updating existing records, you can preserve specific data: - `--preserve-temporary-loan-types`: Keep existing temporary loan type (Items only) - Item status is **preserved by default**; use `--overwrite-item-status` to change +### Always-Protected Fields + +Certain fields are **always preserved** from existing records, regardless of configuration: + +- `hrid` (human-readable ID): Changing it would break external references +- `lastCheckIn` (Items only): Circulation data that should not be overwritten + +### MARC Source Protection + +For Instance records with a MARC source (e.g., `source: "MARC"` or `source: "CONSORTIUM-MARC"`), Batch Poster automatically restricts patching to only these fields: + +- `discoverySuppress`, `staffSuppress`, `deleted` (suppression flags) +- `statisticalCodeIds`, `administrativeNotes`, `instanceStatusId` + +This prevents overwriting MARC-managed fields like title or contributors, which would be reverted on the next SRS update anyway. + ### Selective Patching For fine-grained updates, use `--patch-existing-records` with `--patch-paths`: @@ -181,6 +198,27 @@ For fine-grained updates, use `--patch-existing-records` with `--patch-paths`: This updates only the specified fields while preserving all others from the existing record. +### Consortium Shadow Instances + +For FOLIO ECS (consortium) environments, use `--object-type ShadowInstances` to post shadow copies to member tenants. This automatically converts the `source` field to consortium format: + +- `MARC` → `CONSORTIUM-MARC` +- `FOLIO` → `CONSORTIUM-FOLIO` + +Use `--member-tenant-id` to specify the target member tenant. + +### Rerunning Failed Records + +When `--rerun-failed-records` is enabled (along with `--failed-records-file`), the tool automatically reprocesses any failed records one at a time after the main batch run completes: + +```bash +folio-data-import batch-poster --object-type Items \ + --file-path items.jsonl --upsert \ + --failed-records-file failed.jsonl --rerun-failed-records +``` + +This streams through the failed records file, giving each record an individual retry. Records that still fail are written to a new file with `_rerun` suffix (e.g., `failed_rerun.jsonl`). + ### Workflow ``` diff --git a/src/folio_data_import/BatchPoster.py b/src/folio_data_import/BatchPoster.py index 97bed1a..8027cb5 100644 --- a/src/folio_data_import/BatchPoster.py +++ b/src/folio_data_import/BatchPoster.py @@ -79,6 +79,13 @@ def get_api_info(object_type: str) -> Dict[str, Any]: "is_batch": True, "supports_upsert": True, }, + "ShadowInstances": { + "object_name": "instances", + "api_endpoint": "/instance-storage/batch/synchronous", + "query_endpoint": "/instance-storage/instances", + "is_batch": True, + "supports_upsert": True, + }, } if object_type not in api_info: @@ -135,10 +142,13 @@ class Config(BaseModel): """Configuration for BatchPoster operations.""" object_type: Annotated[ - Literal["Instances", "Holdings", "Items"], + Literal["Instances", "Holdings", "Items", "ShadowInstances"], Field( title="Object type", - description="The type of inventory object to post: Instances, Holdings, or Items", + description=( + "The type of inventory object to post: Instances, Holdings, Items, " + "or ShadowInstances (for consortium shadow copies)" + ), ), ] batch_size: Annotated[ @@ -235,6 +245,16 @@ class Config(BaseModel): ), ), ] = None + rerun_failed_records: Annotated[ + bool, + Field( + title="Rerun failed records", + description=( + "After the main run, reprocess any failed records one at a time. " + "Requires --failed-records-file to be set." + ), + ), + ] = False no_progress: Annotated[ bool, Field( @@ -391,10 +411,21 @@ def keep_existing_fields(self, updates: dict, existing_record: dict) -> None: """ Preserve specific fields from existing record during upsert. + Always preserves ``hrid`` (human-readable ID) and ``lastCheckIn`` (circulation data) + from existing records to prevent data loss. Optionally preserves ``status`` + based on configuration. + Args: updates: Dictionary being prepared for update existing_record: The existing record in FOLIO """ + # Always preserve these fields - they should never be overwritten + always_preserve = ["hrid", "lastCheckIn"] + for key in always_preserve: + if key in existing_record: + updates[key] = existing_record[key] + + # Conditionally preserve item status if self.config.preserve_item_status and "status" in existing_record: updates["status"] = existing_record["status"] @@ -454,6 +485,10 @@ def prepare_record_for_upsert(self, new_record: dict, existing_record: dict) -> """ Prepare a record for upsert by adding version and patching fields. + For MARC-sourced Instance records, only suppression flags, deleted status, + statistical codes, administrative notes, and instance status are allowed + to be patched. This protects MARC-managed fields from being overwritten. + Args: new_record: The new record to prepare existing_record: The existing record in FOLIO @@ -461,8 +496,42 @@ def prepare_record_for_upsert(self, new_record: dict, existing_record: dict) -> # Set the version for optimistic locking new_record["_version"] = existing_record.get("_version", 1) - # Apply patching if configured - if self.config.patch_existing_records: + # Check if this is a MARC-sourced record (Instances only) + is_marc_record = ( + self.config.object_type == "Instances" + and "source" in existing_record + and "MARC" in existing_record.get("source", "") + ) + + if is_marc_record: + # For MARC records, only allow patching specific fields + # Filter patch_paths to only include allowed fields + allowed_marc_fields = {"discoverySuppress", "staffSuppress", "deleted"} + user_patch_paths = set(self.config.patch_paths or []) + + # Only keep suppression/deleted fields from user's patch_paths + restricted_paths = [ + path + for path in user_patch_paths + if any(allowed.lower() in path.lower() for allowed in allowed_marc_fields) + ] + + # Always allow these fields for MARC records + restricted_paths.extend( + ["statisticalCodeIds", "administrativeNotes", "instanceStatusId"] + ) + + if self.config.patch_existing_records and user_patch_paths: + logger.debug( + "Record %s is MARC-sourced, restricting patch to: %s", + existing_record.get("id", "unknown"), + restricted_paths, + ) + + self.patch_record(new_record, existing_record, restricted_paths) + + elif self.config.patch_existing_records: + # Apply patching with user-specified paths self.patch_record(new_record, existing_record, self.config.patch_paths or []) async def fetch_existing_records(self, record_ids: List[str]) -> Dict[str, dict]: @@ -516,6 +585,23 @@ async def fetch_batch(batch_ids: List[str]) -> dict: return existing_records + @staticmethod + def set_consortium_source(record: dict) -> None: + """ + Convert source field for consortium shadow instances. + + For shadow instances in ECS/consortium environments, the source field + must be prefixed with "CONSORTIUM-" to distinguish them from local records. + + Args: + record: The record to modify (modified in place) + """ + source = record.get("source", "") + if source == "MARC": + record["source"] = "CONSORTIUM-MARC" + elif source == "FOLIO": + record["source"] = "CONSORTIUM-FOLIO" + async def set_versions_for_upsert(self, batch: List[dict]) -> None: """ Fetch existing record versions and prepare batch for upsert. @@ -558,6 +644,11 @@ async def post_batch(self, batch: List[dict]) -> tuple[httpx.Response, int, int] num_creates = 0 num_updates = 0 + # For ShadowInstances, convert source to consortium format + if self.config.object_type == "ShadowInstances": + for record in batch: + self.set_consortium_source(record) + # If upsert mode, set versions and track which are updates if self.config.upsert: await self.set_versions_for_upsert(batch) @@ -922,6 +1013,116 @@ async def do_work( return self.stats + async def rerun_failed_records_one_by_one(self) -> None: + """ + Reprocess failed records one at a time. + + Streams through the failed records file, processing each record + individually. Records that still fail are written to a new file + with '_rerun' suffix. This gives each record a second chance + with individual error handling. + """ + if not self._failed_records_path or not self._failed_records_path.exists(): + logger.warning("No failed records file to rerun") + return + + # Close the file handle if we own it + if self._owns_file_handle and self._failed_records_file_handle: + self._failed_records_file_handle.close() + self._failed_records_file_handle = None + + # Count records first for logging + record_count = self._count_lines_in_file(self._failed_records_path) + if record_count == 0: + logger.info("No failed records to rerun") + return + + # Create new file for rerun failures with _rerun suffix + rerun_failed_path = self._failed_records_path.with_stem( + f"{self._failed_records_path.stem}_rerun" + ) + + logger.info("=" * 60) + logger.info("Rerunning %d failed records one at a time...", record_count) + logger.info("=" * 60) + + # Stream through failed records and process one at a time + rerun_success = 0 + rerun_failed = 0 + + # Wrap in reporter context for progress display + with self.reporter: + # Start a new progress task for the rerun + rerun_task_id = self.reporter.start_task( + f"rerun_{self.config.object_type}", + total=record_count, + description=f"Rerunning failed {self.config.object_type}", + ) + + with ( + open(self._failed_records_path, "r", encoding="utf-8") as infile, + open(rerun_failed_path, "w", encoding="utf-8") as outfile, + ): + for line in infile: + line = line.strip() + if not line: + continue + + try: + record = json.loads(line) + except json.JSONDecodeError: + logger.warning("Could not parse failed record line: %s", line[:100]) + outfile.write(line + "\n") + rerun_failed += 1 + self.stats.records_failed += 1 + self.reporter.update_task( + rerun_task_id, + advance=1, + succeeded=rerun_success, + failed=rerun_failed, + ) + continue + + record_id = record.get("id", "unknown") + try: + await self.post_batch([record]) + rerun_success += 1 + logger.debug("Rerun success for record %s", record_id) + except Exception as e: + outfile.write(json.dumps(record) + "\n") + rerun_failed += 1 + self.stats.records_failed += 1 + logger.debug("Rerun failed for record %s: %s", record_id, e) + + self.reporter.update_task( + rerun_task_id, + advance=1, + succeeded=rerun_success, + failed=rerun_failed, + ) + + # Finish the rerun task + self.reporter.finish_task(rerun_task_id) + + # Note: records_posted is already updated by post_batch() calls + # We only need to track the still-failing count for the rerun phase + + logger.info("Rerun complete: %d succeeded, %d still failing", rerun_success, rerun_failed) + if rerun_failed > 0: + logger.info("Still-failing records written to: %s", rerun_failed_path) + else: + # Remove empty rerun file + rerun_failed_path.unlink(missing_ok=True) + + def _count_lines_in_file(self, file_path: Path) -> int: + """Count non-empty lines in a file.""" + count = 0 + with open(file_path, "r", encoding="utf-8") as f: + for line in f: + if line.strip(): + count += 1 + return count + def get_stats(self) -> BatchPosterStats: """ Get current posting statistics. @@ -1041,7 +1242,7 @@ def main( ), ] = None, object_type: Annotated[ - Literal["Instances", "Holdings", "Items"] | None, + Literal["Instances", "Holdings", "Items", "ShadowInstances"] | None, cyclopts.Parameter(group="Job Configuration Parameters"), ] = None, file_paths: Annotated[ @@ -1097,6 +1298,13 @@ def main( Path | None, cyclopts.Parameter(group="Job Configuration Parameters"), ] = None, + rerun_failed_records: Annotated[ + bool, + cyclopts.Parameter( + help="After the main run, reprocess failed records one at a time.", + group="Job Configuration Parameters", + ), + ] = False, no_progress: Annotated[ bool, cyclopts.Parameter(group="Job Configuration Parameters"), @@ -1124,6 +1332,7 @@ def main( patch_existing_records: Enable selective field patching during upsert. patch_paths: Comma-separated list of field paths to patch. failed_records_file: Path to file for writing failed records. + rerun_failed_records: After the main run, reprocess failed records one at a time. no_progress: Disable progress bar display. """ set_up_cli_logging() @@ -1144,6 +1353,11 @@ def main( # Parse patch_paths if provided patch_paths_list = parse_patch_paths(patch_paths) + # Validate rerun_failed_records requires failed_records_file + if rerun_failed_records and not failed_records_file: + logger.critical("--rerun-failed-records requires --failed-records-file to be set") + sys.exit(1) + try: if config_file: config, files_to_process = parse_config_file(config_file) @@ -1167,6 +1381,7 @@ def main( preserve_item_status=not overwrite_item_status, patch_existing_records=patch_existing_records, patch_paths=patch_paths_list, + rerun_failed_records=rerun_failed_records, no_progress=no_progress, ) files_to_process = expanded_file_paths @@ -1241,6 +1456,11 @@ async def run_batch_poster( ) async with poster: await poster.do_work(files_to_process) + + # If rerun_failed_records is enabled and there are failures, reprocess them + if config.rerun_failed_records and poster.stats.records_failed > 0: + await poster.rerun_failed_records_one_by_one() + log_final_stats(poster) except Exception as e: diff --git a/tests/test_batch_poster.py b/tests/test_batch_poster.py index 3636a6f..d36913b 100644 --- a/tests/test_batch_poster.py +++ b/tests/test_batch_poster.py @@ -480,4 +480,516 @@ async def test_do_work_multiple_files(config, mock_folio_client, tmp_path): assert stats.batches_posted == 2 # Verify async_httpx_client.post was called - assert mock_folio_client.async_httpx_client.post.call_count == 2 \ No newline at end of file + assert mock_folio_client.async_httpx_client.post.call_count == 2 + + +class TestRerunFailedRecords: + """Tests for rerun_failed_records_one_by_one method.""" + + @pytest.mark.asyncio + async def test_rerun_no_failed_records_file(self, config, mock_folio_client): + """Test rerun with no failed records file configured.""" + async with BatchPoster(mock_folio_client, config) as poster: + # Should log warning and return without error + await poster.rerun_failed_records_one_by_one() + # No assertions needed - just verify it doesn't crash + + @pytest.mark.asyncio + async def test_rerun_empty_failed_records_file( + self, config, mock_folio_client, tmp_path + ): + """Test rerun with empty failed records file.""" + failed_file = tmp_path / "failed.jsonl" + failed_file.write_text("") + + async with BatchPoster(mock_folio_client, config) as poster: + # Manually set the path (don't use failed_records_file param which truncates) + poster._failed_records_path = failed_file + poster._owns_file_handle = False + await poster.rerun_failed_records_one_by_one() + # Should return early without processing + + @pytest.mark.asyncio + async def test_rerun_all_succeed(self, config, mock_folio_client, tmp_path): + """Test rerun where all records succeed on retry.""" + failed_file = tmp_path / "failed.jsonl" + failed_records = [ + {"id": "fail1", "barcode": "bc1"}, + {"id": "fail2", "barcode": "bc2"}, + ] + with open(failed_file, "w", encoding="utf-8") as f: + for record in failed_records: + f.write(json.dumps(record) + "\n") + + async with BatchPoster(mock_folio_client, config) as poster: + # Manually set the path (don't use failed_records_file param which truncates) + poster._failed_records_path = failed_file + poster._owns_file_handle = False + + await poster.rerun_failed_records_one_by_one() + + # Both records should have been reprocessed + assert poster.stats.records_posted == 2 + assert poster.stats.records_failed == 0 + + # Rerun file should be removed (empty) + rerun_file = tmp_path / "failed_rerun.jsonl" + assert not rerun_file.exists() + + @pytest.mark.asyncio + async def test_rerun_some_still_fail(self, config, mock_folio_client, tmp_path): + """Test rerun where some records still fail.""" + failed_file = tmp_path / "failed.jsonl" + failed_records = [ + {"id": "fail1", "barcode": "bc1"}, + {"id": "fail2", "barcode": "bc2"}, + {"id": "fail3", "barcode": "bc3"}, + ] + with open(failed_file, "w", encoding="utf-8") as f: + for record in failed_records: + f.write(json.dumps(record) + "\n") + + # Make second record fail again + call_count = 0 + + async def mock_post(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 2: + raise Exception("Still failing") + mock_response = Mock() + mock_response.json = Mock(return_value={}) + mock_response.raise_for_status = Mock() + mock_response.elapsed.total_seconds = Mock(return_value=0.1) + mock_request = Mock() + mock_request.method = "POST" + mock_request.url = "https://test" + mock_request.headers = {} + mock_request.content = b"{}" + mock_response.request = mock_request + return mock_response + + mock_folio_client.async_httpx_client.post = mock_post + + async with BatchPoster(mock_folio_client, config) as poster: + # Manually set the path (don't use failed_records_file param which truncates) + poster._failed_records_path = failed_file + poster._owns_file_handle = False + + await poster.rerun_failed_records_one_by_one() + + # 2 succeeded, 1 still failed + assert poster.stats.records_posted == 2 + assert poster.stats.records_failed == 1 + + # Rerun file should exist with the still-failing record + rerun_file = tmp_path / "failed_rerun.jsonl" + assert rerun_file.exists() + with open(rerun_file, "r", encoding="utf-8") as f: + still_failing = [json.loads(line) for line in f if line.strip()] + assert len(still_failing) == 1 + assert still_failing[0]["id"] == "fail2" + + @pytest.mark.asyncio + async def test_rerun_preserves_original_file( + self, config, mock_folio_client, tmp_path + ): + """Test that rerun preserves the original failed records file.""" + failed_file = tmp_path / "failed.jsonl" + failed_records = [{"id": "fail1", "barcode": "bc1"}] + with open(failed_file, "w", encoding="utf-8") as f: + for record in failed_records: + f.write(json.dumps(record) + "\n") + + original_content = failed_file.read_text() + + async with BatchPoster(mock_folio_client, config) as poster: + # Manually set the path (don't use failed_records_file param which truncates) + poster._failed_records_path = failed_file + poster._owns_file_handle = False + + await poster.rerun_failed_records_one_by_one() + + # Original file should still exist and be unchanged + assert failed_file.exists() + assert failed_file.read_text() == original_content + + @pytest.mark.asyncio + async def test_rerun_creates_file_with_rerun_suffix( + self, config, mock_folio_client, tmp_path + ): + """Test that rerun creates new file with _rerun suffix.""" + failed_file = tmp_path / "my_failed_items.jsonl" + failed_records = [{"id": "fail1", "barcode": "bc1"}] + with open(failed_file, "w", encoding="utf-8") as f: + for record in failed_records: + f.write(json.dumps(record) + "\n") + + # Make the record fail again + mock_folio_client.async_httpx_client.post = AsyncMock( + side_effect=Exception("Still failing") + ) + + async with BatchPoster(mock_folio_client, config) as poster: + # Manually set the path (don't use failed_records_file param which truncates) + poster._failed_records_path = failed_file + poster._owns_file_handle = False + + await poster.rerun_failed_records_one_by_one() + + # New file should have _rerun suffix + rerun_file = tmp_path / "my_failed_items_rerun.jsonl" + assert rerun_file.exists() + + @pytest.mark.asyncio + async def test_rerun_handles_invalid_json_lines( + self, config, mock_folio_client, tmp_path + ): + """Test that rerun handles invalid JSON lines gracefully.""" + failed_file = tmp_path / "failed.jsonl" + with open(failed_file, "w", encoding="utf-8") as f: + f.write('{"id": "valid1", "barcode": "bc1"}\n') + f.write("not valid json\n") + f.write('{"id": "valid2", "barcode": "bc2"}\n') + + async with BatchPoster(mock_folio_client, config) as poster: + # Manually set the path (don't use failed_records_file param which truncates) + poster._failed_records_path = failed_file + poster._owns_file_handle = False + + await poster.rerun_failed_records_one_by_one() + + # Valid records should be posted, invalid should be in rerun file + assert poster.stats.records_posted == 2 + assert poster.stats.records_failed == 1 + + rerun_file = tmp_path / "failed_rerun.jsonl" + assert rerun_file.exists() + with open(rerun_file, "r", encoding="utf-8") as f: + still_failing = [line.strip() for line in f if line.strip()] + assert len(still_failing) == 1 + assert still_failing[0] == "not valid json" + + @pytest.mark.asyncio + async def test_rerun_updates_progress_reporter( + self, config, mock_folio_client, tmp_path + ): + """Test that rerun updates the progress reporter.""" + from folio_data_import._progress import NoOpProgressReporter + + failed_file = tmp_path / "failed.jsonl" + failed_records = [ + {"id": "fail1", "barcode": "bc1"}, + {"id": "fail2", "barcode": "bc2"}, + ] + with open(failed_file, "w", encoding="utf-8") as f: + for record in failed_records: + f.write(json.dumps(record) + "\n") + + # Create a mock reporter to track calls + mock_reporter = Mock(spec=NoOpProgressReporter) + mock_reporter.start_task = Mock(return_value="rerun_task_id") + mock_reporter.update_task = Mock() + mock_reporter.finish_task = Mock() + mock_reporter.__enter__ = Mock(return_value=mock_reporter) + mock_reporter.__exit__ = Mock(return_value=None) + + async with BatchPoster( + mock_folio_client, + config, + reporter=mock_reporter, + ) as poster: + # Manually set the path (don't use failed_records_file param which truncates) + poster._failed_records_path = failed_file + poster._owns_file_handle = False + + await poster.rerun_failed_records_one_by_one() + + # Progress reporter should have been used + mock_reporter.start_task.assert_called_once() + assert mock_reporter.update_task.call_count == 2 # Once per record + mock_reporter.finish_task.assert_called_once() + + +class TestShadowInstances: + """Tests for ShadowInstances object type support.""" + + def test_get_api_info_shadow_instances(self): + """Test that ShadowInstances returns valid API info.""" + info = get_api_info("ShadowInstances") + assert info["object_name"] == "instances" + assert info["api_endpoint"] == "/instance-storage/batch/synchronous" + assert info["supports_upsert"] is True + + def test_shadow_instances_config(self): + """Test creating config with ShadowInstances object type.""" + config = BatchPoster.Config(object_type="ShadowInstances") + assert config.object_type == "ShadowInstances" + + +class TestSetConsortiumSource: + """Tests for set_consortium_source static method.""" + + def test_convert_marc_to_consortium_marc(self): + """Test MARC source is converted to CONSORTIUM-MARC.""" + record = {"id": "123", "source": "MARC"} + BatchPoster.set_consortium_source(record) + assert record["source"] == "CONSORTIUM-MARC" + + def test_convert_folio_to_consortium_folio(self): + """Test FOLIO source is converted to CONSORTIUM-FOLIO.""" + record = {"id": "123", "source": "FOLIO"} + BatchPoster.set_consortium_source(record) + assert record["source"] == "CONSORTIUM-FOLIO" + + def test_no_conversion_for_other_sources(self): + """Test that other source values are not modified.""" + record = {"id": "123", "source": "LINKED_DATA"} + BatchPoster.set_consortium_source(record) + assert record["source"] == "LINKED_DATA" + + def test_no_conversion_when_source_missing(self): + """Test that records without source field are not modified.""" + record = {"id": "123", "title": "Test"} + BatchPoster.set_consortium_source(record) + assert "source" not in record or record.get("source") == "" + + def test_no_conversion_for_empty_source(self): + """Test that empty source is not modified.""" + record = {"id": "123", "source": ""} + BatchPoster.set_consortium_source(record) + assert record["source"] == "" + + +class TestKeepExistingFields: + """Tests for keep_existing_fields method preserving hrid and lastCheckIn.""" + + @pytest.mark.asyncio + async def test_always_preserves_hrid(self, mock_folio_client): + """Test that hrid is always preserved from existing record.""" + config = BatchPoster.Config(object_type="Items") + async with BatchPoster(mock_folio_client, config) as poster: + updates = {"id": "123", "barcode": "NEW"} + existing = {"id": "123", "hrid": "it00000001", "barcode": "OLD"} + + poster.keep_existing_fields(updates, existing) + + assert updates["hrid"] == "it00000001" + + @pytest.mark.asyncio + async def test_always_preserves_last_check_in(self, mock_folio_client): + """Test that lastCheckIn is always preserved from existing record.""" + config = BatchPoster.Config(object_type="Items") + async with BatchPoster(mock_folio_client, config) as poster: + updates = {"id": "123", "barcode": "NEW"} + existing = { + "id": "123", + "lastCheckIn": {"dateTime": "2024-01-15T10:30:00Z"}, + "barcode": "OLD", + } + + poster.keep_existing_fields(updates, existing) + + assert updates["lastCheckIn"] == {"dateTime": "2024-01-15T10:30:00Z"} + + @pytest.mark.asyncio + async def test_preserves_status_when_configured(self, mock_folio_client): + """Test that status is preserved when preserve_item_status is True.""" + config = BatchPoster.Config(object_type="Items", preserve_item_status=True) + async with BatchPoster(mock_folio_client, config) as poster: + updates = {"id": "123", "status": {"name": "Available"}} + existing = {"id": "123", "status": {"name": "Checked out"}} + + poster.keep_existing_fields(updates, existing) + + assert updates["status"] == {"name": "Checked out"} + + @pytest.mark.asyncio + async def test_does_not_preserve_status_when_not_configured(self, mock_folio_client): + """Test that status is not preserved when preserve_item_status is False.""" + config = BatchPoster.Config(object_type="Items", preserve_item_status=False) + async with BatchPoster(mock_folio_client, config) as poster: + updates = {"id": "123", "status": {"name": "Available"}} + existing = {"id": "123", "status": {"name": "Checked out"}} + + poster.keep_existing_fields(updates, existing) + + # Status should remain as the new value + assert updates["status"] == {"name": "Available"} + + @pytest.mark.asyncio + async def test_handles_missing_fields_gracefully(self, mock_folio_client): + """Test that missing fields in existing record don't cause errors.""" + config = BatchPoster.Config(object_type="Items") + async with BatchPoster(mock_folio_client, config) as poster: + updates = {"id": "123", "barcode": "NEW"} + existing = {"id": "123", "barcode": "OLD"} # No hrid or lastCheckIn + + poster.keep_existing_fields(updates, existing) + + # Should not add fields that don't exist + assert "hrid" not in updates + assert "lastCheckIn" not in updates + + +class TestMARCSourceProtection: + """Tests for MARC source protection in prepare_record_for_upsert.""" + + @pytest.mark.asyncio + async def test_marc_record_restricts_patch_paths(self, mock_folio_client): + """Test that MARC-sourced records have restricted patch paths.""" + config = BatchPoster.Config( + object_type="Instances", + patch_existing_records=True, + patch_paths=["title", "contributors", "discoverySuppress"], + ) + async with BatchPoster(mock_folio_client, config) as poster: + new_record = { + "id": "123", + "title": "New Title", + "contributors": [{"name": "New Author"}], + "discoverySuppress": True, + } + existing_record = { + "id": "123", + "source": "MARC", + "title": "Original Title", + "contributors": [{"name": "Original Author"}], + "discoverySuppress": False, + "_version": 5, + } + + poster.prepare_record_for_upsert(new_record, existing_record) + + # Version should be set + assert new_record["_version"] == 5 + # discoverySuppress should be updated (allowed for MARC) + assert new_record["discoverySuppress"] is True + # Title should NOT be updated (not allowed for MARC) + assert new_record["title"] == "Original Title" + + @pytest.mark.asyncio + async def test_marc_record_allows_statistical_codes(self, mock_folio_client): + """Test that MARC records allow statisticalCodeIds to be patched.""" + config = BatchPoster.Config( + object_type="Instances", + patch_existing_records=True, + patch_paths=["statisticalCodeIds"], + ) + async with BatchPoster(mock_folio_client, config) as poster: + new_record = { + "id": "123", + "statisticalCodeIds": ["stat-code-1", "stat-code-2"], + } + existing_record = { + "id": "123", + "source": "MARC", + "statisticalCodeIds": [], + "_version": 3, + } + + poster.prepare_record_for_upsert(new_record, existing_record) + + # statisticalCodeIds should be patchable + assert "stat-code-1" in new_record.get("statisticalCodeIds", []) + + @pytest.mark.asyncio + async def test_consortium_marc_treated_as_marc(self, mock_folio_client): + """Test that CONSORTIUM-MARC source is also protected.""" + config = BatchPoster.Config( + object_type="Instances", + patch_existing_records=True, + patch_paths=["title"], + ) + async with BatchPoster(mock_folio_client, config) as poster: + new_record = {"id": "123", "title": "New Title"} + existing_record = { + "id": "123", + "source": "CONSORTIUM-MARC", + "title": "Original Title", + "_version": 2, + } + + poster.prepare_record_for_upsert(new_record, existing_record) + + # Title should NOT be updated for CONSORTIUM-MARC + assert new_record["title"] == "Original Title" + + @pytest.mark.asyncio + async def test_folio_source_allows_all_patches(self, mock_folio_client): + """Test that FOLIO-sourced records allow all patches.""" + config = BatchPoster.Config( + object_type="Instances", + patch_existing_records=True, + patch_paths=["title", "contributors"], + ) + async with BatchPoster(mock_folio_client, config) as poster: + new_record = { + "id": "123", + "title": "New Title", + "contributors": [{"name": "New Author"}], + } + existing_record = { + "id": "123", + "source": "FOLIO", + "title": "Original Title", + "contributors": [{"name": "Original Author"}], + "_version": 4, + } + + poster.prepare_record_for_upsert(new_record, existing_record) + + # All fields should be patchable for FOLIO source + assert new_record["title"] == "New Title" + + @pytest.mark.asyncio + async def test_non_instance_types_not_affected(self, mock_folio_client): + """Test that MARC protection only applies to Instances.""" + config = BatchPoster.Config( + object_type="Items", + patch_existing_records=True, + patch_paths=["barcode"], + ) + async with BatchPoster(mock_folio_client, config) as poster: + new_record = {"id": "123", "barcode": "NEW-BARCODE"} + existing_record = { + "id": "123", + "barcode": "OLD-BARCODE", + "_version": 1, + } + + poster.prepare_record_for_upsert(new_record, existing_record) + + # Items should allow all patches regardless of source + assert new_record["barcode"] == "NEW-BARCODE" + + +@pytest.mark.asyncio +class TestShadowInstancesPostBatch: + """Tests for ShadowInstances source conversion during post_batch.""" + + async def test_shadow_instances_converts_source(self, mock_folio_client): + """Test that ShadowInstances converts MARC to CONSORTIUM-MARC.""" + config = BatchPoster.Config(object_type="ShadowInstances") + batch = [ + {"id": "1", "title": "Book 1", "source": "MARC"}, + {"id": "2", "title": "Book 2", "source": "FOLIO"}, + ] + + async with BatchPoster(mock_folio_client, config) as poster: + await poster.post_batch(batch) + + # Verify the records were modified before posting + assert batch[0]["source"] == "CONSORTIUM-MARC" + assert batch[1]["source"] == "CONSORTIUM-FOLIO" + + async def test_regular_instances_does_not_convert_source(self, mock_folio_client): + """Test that regular Instances does not convert source.""" + config = BatchPoster.Config(object_type="Instances") + batch = [{"id": "1", "title": "Book 1", "source": "MARC"}] + + async with BatchPoster(mock_folio_client, config) as poster: + await poster.post_batch(batch) + + # Source should remain unchanged for regular Instances + assert batch[0]["source"] == "MARC" \ No newline at end of file From 7c93607e8b02cff1e8080b8edcd97fffd446acb7 Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Wed, 21 Jan 2026 00:40:11 -0600 Subject: [PATCH 03/12] Refine record failure tracking in rerun logic and update tests for clarity --- src/folio_data_import/BatchPoster.py | 5 ++--- tests/test_batch_poster.py | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/folio_data_import/BatchPoster.py b/src/folio_data_import/BatchPoster.py index 8027cb5..180ba02 100644 --- a/src/folio_data_import/BatchPoster.py +++ b/src/folio_data_import/BatchPoster.py @@ -513,7 +513,7 @@ def prepare_record_for_upsert(self, new_record: dict, existing_record: dict) -> restricted_paths = [ path for path in user_patch_paths - if any(allowed.lower() in path.lower() for allowed in allowed_marc_fields) + if any(allowed.lower() == path.lower() for allowed in allowed_marc_fields) ] # Always allow these fields for MARC records @@ -1074,7 +1074,6 @@ async def rerun_failed_records_one_by_one(self) -> None: logger.warning("Could not parse failed record line: %s", line[:100]) outfile.write(line + "\n") rerun_failed += 1 - self.stats.records_failed += 1 self.reporter.update_task( rerun_task_id, advance=1, @@ -1091,7 +1090,7 @@ async def rerun_failed_records_one_by_one(self) -> None: except Exception as e: outfile.write(json.dumps(record) + "\n") rerun_failed += 1 - self.stats.records_failed += 1 + logger.debug("Rerun failed for record %s: %s", record_id, e) self.reporter.update_task( diff --git a/tests/test_batch_poster.py b/tests/test_batch_poster.py index d36913b..e04a873 100644 --- a/tests/test_batch_poster.py +++ b/tests/test_batch_poster.py @@ -578,9 +578,8 @@ async def mock_post(*args, **kwargs): await poster.rerun_failed_records_one_by_one() - # 2 succeeded, 1 still failed + # 2 succeeded, 1 still failed (rerun failures tracked separately, not in stats) assert poster.stats.records_posted == 2 - assert poster.stats.records_failed == 1 # Rerun file should exist with the still-failing record rerun_file = tmp_path / "failed_rerun.jsonl" @@ -660,8 +659,8 @@ async def test_rerun_handles_invalid_json_lines( await poster.rerun_failed_records_one_by_one() # Valid records should be posted, invalid should be in rerun file + # (rerun failures tracked separately, not in stats) assert poster.stats.records_posted == 2 - assert poster.stats.records_failed == 1 rerun_file = tmp_path / "failed_rerun.jsonl" assert rerun_file.exists() @@ -752,7 +751,7 @@ def test_no_conversion_when_source_missing(self): """Test that records without source field are not modified.""" record = {"id": "123", "title": "Test"} BatchPoster.set_consortium_source(record) - assert "source" not in record or record.get("source") == "" + assert "source" not in record def test_no_conversion_for_empty_source(self): """Test that empty source is not modified.""" From a278e5863912f2f94fd520404728f5b38f1916c8 Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Wed, 21 Jan 2026 10:51:46 -0600 Subject: [PATCH 04/12] Enhance test_rerun_no_failed_records_file docstring to clarify expected behavior --- tests/test_batch_poster.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_batch_poster.py b/tests/test_batch_poster.py index e04a873..31e9f97 100644 --- a/tests/test_batch_poster.py +++ b/tests/test_batch_poster.py @@ -488,11 +488,12 @@ class TestRerunFailedRecords: @pytest.mark.asyncio async def test_rerun_no_failed_records_file(self, config, mock_folio_client): - """Test rerun with no failed records file configured.""" + """Test rerun with no failed records file configured. + + Should log warning and return without error - no assertions needed. + """ async with BatchPoster(mock_folio_client, config) as poster: - # Should log warning and return without error await poster.rerun_failed_records_one_by_one() - # No assertions needed - just verify it doesn't crash @pytest.mark.asyncio async def test_rerun_empty_failed_records_file( From a4de1725ea9e467d77f7a90e7158de49861adefa Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Wed, 21 Jan 2026 16:00:18 -0600 Subject: [PATCH 05/12] Optimize line counting method for improved performance in file processing --- src/folio_data_import/BatchPoster.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/folio_data_import/BatchPoster.py b/src/folio_data_import/BatchPoster.py index 180ba02..2e50a77 100644 --- a/src/folio_data_import/BatchPoster.py +++ b/src/folio_data_import/BatchPoster.py @@ -1114,13 +1114,9 @@ async def rerun_failed_records_one_by_one(self) -> None: rerun_failed_path.unlink(missing_ok=True) def _count_lines_in_file(self, file_path: Path) -> int: - """Count non-empty lines in a file.""" - count = 0 - with open(file_path, "r", encoding="utf-8") as f: - for line in f: - if line.strip(): - count += 1 - return count + """Count lines in a file using efficient binary newline counting.""" + with open(file_path, "rb") as f: + return sum(buf.count(b"\n") for buf in iter(lambda: f.read(1024 * 1024), b"")) def get_stats(self) -> BatchPosterStats: """ From c7920b9d7da85bf2cc5e7485a7d23dc1b8d09afa Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Wed, 21 Jan 2026 16:01:07 -0600 Subject: [PATCH 06/12] Update GitHub Actions workflow to use updated actions and streamline project installation --- .github/workflows/python-package.yml | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 5110dbd..ef75857 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -15,19 +15,18 @@ jobs: matrix: python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + - uses: actions/checkout@v6 + - name: Install uv + uses: astral-sh/setup-uv@v7 with: python-version: ${{ matrix.python-version }} - cache: pip - cache-dependency-path: '**/pyproject.toml' - - name: Install poetry - run: | - python -m pip install poetry - - name: Install project with dependencies - run: | - poetry install --with dev + enable-cache: true + + - name: Install the project + run: uv sync --locked --all-extras --dev + + - name: Install project + run: uv sync --locked --all-extras --dev + - name: Run tests - run: | - poetry run pytest + run: uv run pytest tests/ From 0b0ff2d0829745496e6c96d7c89b6442d8be58db Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Wed, 21 Jan 2026 16:02:37 -0600 Subject: [PATCH 07/12] Remove erroneous locked option from project installation step in GitHub Actions workflow --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index ef75857..901b76e 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -26,7 +26,7 @@ jobs: run: uv sync --locked --all-extras --dev - name: Install project - run: uv sync --locked --all-extras --dev + run: uv sync --all-extras --dev - name: Run tests run: uv run pytest tests/ From b6877a1268e2de6f834e413635da25f2726f34b2 Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Wed, 21 Jan 2026 16:04:01 -0600 Subject: [PATCH 08/12] Fix workflow --- .github/workflows/python-package.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 901b76e..8c0c014 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -22,9 +22,6 @@ jobs: python-version: ${{ matrix.python-version }} enable-cache: true - - name: Install the project - run: uv sync --locked --all-extras --dev - - name: Install project run: uv sync --all-extras --dev From 7a7b3d8483a301ed3979b25db766bae88edd3cea Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Thu, 22 Jan 2026 00:42:45 -0600 Subject: [PATCH 09/12] Update Python version to 3.13 and add formats section in Read the Docs configuration --- .readthedocs.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.readthedocs.yml b/.readthedocs.yml index 018cd19..24c1636 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -7,10 +7,9 @@ version: 2 build: os: ubuntu-22.04 tools: - python: "3.10" + python: "3.13" python: - version: 3.10 install: - method: pip path: . @@ -19,3 +18,6 @@ python: sphinx: configuration: docs/conf.py fail_on_warning: false + +formats: + - htmlzip From cf23343f3bf90ded550a9ce270ed44633564fbf2 Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Thu, 22 Jan 2026 01:29:23 -0600 Subject: [PATCH 10/12] Bump version to 0.5.0b5 in pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2df456b..de3452c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "folio_data_import" -version = "0.5.0b4" +version = "0.5.0b5" description = "A python module to perform bulk import of data into a FOLIO environment. Currently supports MARC and user data import." authors = [{ name = "Brooks Travis", email = "brooks.travis@gmail.com" }] license = "MIT" From 87f712e04cb1c5bf0322ae1c48b5f53ee9ac5104 Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Thu, 22 Jan 2026 15:35:47 -0600 Subject: [PATCH 11/12] Add assertions to verify rerun stats in TestRerunFailedRecords --- tests/test_batch_poster.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_batch_poster.py b/tests/test_batch_poster.py index 31e9f97..e2cae5a 100644 --- a/tests/test_batch_poster.py +++ b/tests/test_batch_poster.py @@ -581,6 +581,9 @@ async def mock_post(*args, **kwargs): # 2 succeeded, 1 still failed (rerun failures tracked separately, not in stats) assert poster.stats.records_posted == 2 + # Verify rerun stats are populated for final reporting + assert poster.stats.rerun_succeeded == 2 + assert poster.stats.rerun_still_failed == 1 # Rerun file should exist with the still-failing record rerun_file = tmp_path / "failed_rerun.jsonl" From f8b366cb24958a150d28050c680073e42375ceb4 Mon Sep 17 00:00:00 2001 From: Brooks Travis Date: Thu, 22 Jan 2026 15:35:58 -0600 Subject: [PATCH 12/12] Refactor logging setup and enhance BatchPoster and MARCDataImport functionality - Consolidate logging setup in __init__.py with custom logger and filters. - Add rerun statistics tracking in BatchPoster for improved reporting. - Update MARCDataImport and UserImport to utilize new logging setup and parameters. - Remove redundant logging functions from MARCDataImport and UserImport. --- src/folio_data_import/BatchPoster.py | 64 +++++--------- src/folio_data_import/MARCDataImport.py | 98 +++------------------ src/folio_data_import/UserImport.py | 54 +++--------- src/folio_data_import/__init__.py | 110 ++++++++++++++++++++++++ 4 files changed, 156 insertions(+), 170 deletions(-) diff --git a/src/folio_data_import/BatchPoster.py b/src/folio_data_import/BatchPoster.py index 2e50a77..a8cc291 100644 --- a/src/folio_data_import/BatchPoster.py +++ b/src/folio_data_import/BatchPoster.py @@ -10,23 +10,21 @@ import json import logging import sys -from datetime import datetime as dt from io import TextIOWrapper from pathlib import Path from typing import Annotated, Any, Dict, Generator, List, Literal, Union import cyclopts import folioclient -from folioclient import FolioClient import httpx +from folioclient import FolioClient from pydantic import BaseModel, Field -from rich.logging import RichHandler -from folio_data_import import get_folio_connection_parameters +from folio_data_import import get_folio_connection_parameters, set_up_cli_logging from folio_data_import._progress import ( - RichProgressReporter, - ProgressReporter, NoOpProgressReporter, + ProgressReporter, + RichProgressReporter, ) logger = logging.getLogger(__name__) @@ -42,6 +40,8 @@ class BatchPosterStats(BaseModel): records_failed: int = 0 batches_posted: int = 0 batches_failed: int = 0 + rerun_succeeded: int = 0 + rerun_still_failed: int = 0 def get_api_info(object_type: str) -> Dict[str, Any]: @@ -1103,8 +1103,9 @@ async def rerun_failed_records_one_by_one(self) -> None: # Finish the rerun task self.reporter.finish_task(rerun_task_id) - # Note: records_posted is already updated by post_batch() calls - # We only need to track the still-failing count for the rerun phase + # Store rerun results in stats for final reporting + self.stats.rerun_succeeded = rerun_success + self.stats.rerun_still_failed = rerun_failed logger.info("Rerun complete: %d succeeded, %d still failing", rerun_success, rerun_failed) if rerun_failed > 0: @@ -1158,41 +1159,6 @@ def get_req_size(response: httpx.Response): return get_human_readable_size(len(size.encode("utf-8"))) -def set_up_cli_logging() -> None: - """ - This function sets up logging for the CLI. - """ - - logger.setLevel(logging.INFO) - logger.propagate = False - - # Set up file and stream handlers - file_handler = logging.FileHandler( - "folio_batch_poster_{}.log".format(dt.now().strftime("%Y%m%d%H%M%S")) - ) - file_handler.setLevel(logging.INFO) - file_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - file_handler.setFormatter(file_formatter) - logger.addHandler(file_handler) - - if not any( - isinstance(h, logging.StreamHandler) and h.stream == sys.stderr for h in logger.handlers - ): - stream_handler = RichHandler( - show_level=False, - show_time=False, - omit_repeated_times=False, - show_path=False, - ) - stream_handler.setLevel(logging.INFO) - stream_formatter = logging.Formatter("%(message)s") - stream_handler.setFormatter(stream_formatter) - logger.addHandler(stream_handler) - - # Stop httpx from logging info messages to the console - logging.getLogger("httpx").setLevel(logging.WARNING) - - app = cyclopts.App(default_parameter=cyclopts.Parameter(negative=())) @@ -1304,6 +1270,12 @@ def main( bool, cyclopts.Parameter(group="Job Configuration Parameters"), ] = False, + debug: Annotated[ + bool, + cyclopts.Parameter( + name=["--debug"], group="General Parameters", help="Enable debug logging" + ), + ] = False, ) -> None: """ Command-line interface to batch post inventory records to FOLIO @@ -1329,8 +1301,9 @@ def main( failed_records_file: Path to file for writing failed records. rerun_failed_records: After the main run, reprocess failed records one at a time. no_progress: Disable progress bar display. + debug: Enable debug logging. """ - set_up_cli_logging() + set_up_cli_logging(logger, "folio_batch_poster", debug) gateway_url, tenant_id, username, password = get_folio_connection_parameters( gateway_url, tenant_id, username, password @@ -1482,6 +1455,9 @@ def log_final_stats(poster: BatchPoster) -> None: logger.info("Records failed: %d", poster.stats.records_failed) logger.info("Total batches posted: %d", poster.stats.batches_posted) logger.info("Total batches failed: %d", poster.stats.batches_failed) + if poster.config.rerun_failed_records: + logger.info("Rerun succeeded: %d", poster.stats.rerun_succeeded) + logger.info("Rerun still failed: %d", poster.stats.rerun_still_failed) if poster._failed_records_path: logger.info("Failed records written to: %s", poster._failed_records_path) diff --git a/src/folio_data_import/MARCDataImport.py b/src/folio_data_import/MARCDataImport.py index 2c9101c..273ef90 100644 --- a/src/folio_data_import/MARCDataImport.py +++ b/src/folio_data_import/MARCDataImport.py @@ -23,10 +23,13 @@ import tabulate from humps import decamelize from pydantic import BaseModel, Field -from rich.logging import RichHandler from folio_data_import import __version__ as app_version -from folio_data_import import get_folio_connection_parameters +from folio_data_import import ( + CustomLogger, + get_folio_connection_parameters, + set_up_cli_logging, +) from folio_data_import._progress import ( NoOpProgressReporter, ProgressReporter, @@ -52,10 +55,6 @@ RETRY_TIMEOUT_RETRY_FACTOR = 1.5 RETRY_TIMEOUT_MAX = 25.32 -# Custom log level for data issues, set to 26 -DATA_ISSUE_LVL_NUM = 26 -logging.addLevelName(DATA_ISSUE_LVL_NUM, "DATA_ISSUES") - class MARCImportStats(BaseModel): """Statistics for MARC import operations.""" @@ -68,18 +67,6 @@ class MARCImportStats(BaseModel): error: int = 0 -class CustomLogger(logging.Logger): - """Logger subclass with custom data_issues method.""" - - def data_issues(self, msg: str, *args, **kws) -> None: - """Log data issues at custom level (26).""" - if self.isEnabledFor(DATA_ISSUE_LVL_NUM): - self._log(DATA_ISSUE_LVL_NUM, msg, args, **kws) - - -# Set the custom logger class as the default -logging.setLoggerClass(CustomLogger) - logger: CustomLogger = logging.getLogger(__name__) # type: ignore[assignment] @@ -966,54 +953,6 @@ async def get_job_summary(self) -> dict: return job_summary -def set_up_cli_logging() -> None: - """ - This function sets up logging for the CLI. - """ - logger.setLevel(logging.INFO) - logger.propagate = False - - # Set up file and stream handlers - file_handler = logging.FileHandler( - "folio_data_import_{}.log".format(dt.now().strftime("%Y%m%d%H%M%S")) - ) - file_handler.setLevel(logging.INFO) - file_handler.addFilter(ExcludeLevelFilter(DATA_ISSUE_LVL_NUM)) - # file_handler.addFilter(IncludeLevelFilter(25)) - file_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - file_handler.setFormatter(file_formatter) - logger.addHandler(file_handler) - - if not any( - isinstance(h, logging.StreamHandler) and h.stream == sys.stderr for h in logger.handlers - ): - stream_handler = RichHandler( - show_level=False, - show_time=False, - omit_repeated_times=False, - show_path=False, - ) - stream_handler.setLevel(logging.INFO) - stream_handler.addFilter(ExcludeLevelFilter(DATA_ISSUE_LVL_NUM)) - # stream_handler.addFilter(ExcludeLevelFilter(25)) - stream_formatter = logging.Formatter("%(message)s") - stream_handler.setFormatter(stream_formatter) - logger.addHandler(stream_handler) - - # Set up data issues logging - data_issues_handler = logging.FileHandler( - "marc_import_data_issues_{}.log".format(dt.now().strftime("%Y%m%d%H%M%S")) - ) - data_issues_handler.setLevel(26) - data_issues_handler.addFilter(IncludeLevelFilter(DATA_ISSUE_LVL_NUM)) - data_issues_formatter = logging.Formatter("%(message)s") - data_issues_handler.setFormatter(data_issues_formatter) - logger.addHandler(data_issues_handler) - - # Stop httpx from logging info messages to the console - logging.getLogger("httpx").setLevel(logging.WARNING) - - app = cyclopts.App(version=app_version) @@ -1104,6 +1043,12 @@ def main( job_ids_file_path: Annotated[ str | None, cyclopts.Parameter(group="Job Configuration Parameters") ] = None, + debug: Annotated[ + bool, + cyclopts.Parameter( + name=["--debug"], group="General Parameters", help="Enable debug logging" + ), + ] = False, ) -> None: """ Command-line interface to batch import MARC records into FOLIO using FOLIO Data Import @@ -1130,8 +1075,9 @@ def main( let_summary_fail (bool): Let the final summary check fail without exiting. preprocessor_config (str): Path to JSON config file for the preprocessor. job_ids_file_path (str): Path to file to write job IDs to. + debug (bool): Enable debug logging. """ # noqa: E501 - set_up_cli_logging() + set_up_cli_logging(logger, "folio_marc_data_import", debug, True) gateway_url, tenant_id, username, password = get_folio_connection_parameters( gateway_url, tenant_id, username, password ) @@ -1257,23 +1203,5 @@ async def run_job(job: MARCImportJob): await job.wrap_up() -class ExcludeLevelFilter(logging.Filter): - def __init__(self, level) -> None: - super().__init__() - self.level = level - - def filter(self, record): - return record.levelno != self.level - - -class IncludeLevelFilter(logging.Filter): - def __init__(self, level) -> None: - super().__init__() - self.level = level - - def filter(self, record): - return record.levelno == self.level - - if __name__ == "__main__": app() diff --git a/src/folio_data_import/UserImport.py b/src/folio_data_import/UserImport.py index 4efef0f..fe955ee 100644 --- a/src/folio_data_import/UserImport.py +++ b/src/folio_data_import/UserImport.py @@ -6,10 +6,10 @@ import sys import time import uuid -from io import TextIOWrapper from datetime import datetime as dt +from io import TextIOWrapper from pathlib import Path -from typing import List, Literal, Tuple, Annotated +from typing import Annotated, List, Literal, Tuple import aiofiles import cyclopts @@ -17,13 +17,12 @@ import httpx from aiofiles.threadpool.text import AsyncTextIOWrapper from pydantic import BaseModel, Field -from rich.logging import RichHandler -from folio_data_import import get_folio_connection_parameters +from folio_data_import import get_folio_connection_parameters, set_up_cli_logging from folio_data_import._progress import ( - RichProgressReporter, - ProgressReporter, NoOpProgressReporter, + ProgressReporter, + RichProgressReporter, ) try: @@ -1022,40 +1021,6 @@ def get_stats(self) -> UserImporterStats: return self.stats -def set_up_cli_logging() -> None: - """ - This function sets up logging for the CLI. - """ - logger.setLevel(logging.INFO) - logger.propagate = False - - # Set up file and stream handlers - file_handler = logging.FileHandler( - "folio_user_import_{}.log".format(dt.now().strftime("%Y%m%d%H%M%S")) - ) - file_handler.setLevel(logging.INFO) - file_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - file_handler.setFormatter(file_formatter) - logger.addHandler(file_handler) - - if not any( - isinstance(h, logging.StreamHandler) and h.stream == sys.stderr for h in logger.handlers - ): - stream_handler = RichHandler( - show_level=False, - show_time=False, - omit_repeated_times=False, - show_path=False, - ) - stream_handler.setLevel(logging.WARNING) - stream_formatter = logging.Formatter("%(message)s") - stream_handler.setFormatter(stream_formatter) - logger.addHandler(stream_handler) - - # Stop httpx from logging info messages to the console - logging.getLogger("httpx").setLevel(logging.WARNING) - - app = cyclopts.App() @@ -1153,6 +1118,12 @@ def main( cyclopts.Parameter(group="Job Configuration Parameters"), ] = "email", no_progress: Annotated[bool, cyclopts.Parameter(group="Job Configuration Parameters")] = False, + debug: Annotated[ + bool, + cyclopts.Parameter( + name=["--debug"], group="General Parameters", help="Enable debug logging" + ), + ] = False, ) -> None: """ Command-line interface to batch import users into FOLIO @@ -1175,8 +1146,9 @@ def main( user_match_key (str): The key to match users (externalSystemId, username, barcode). default_preferred_contact_type (str): The default preferred contact type for users no_progress (bool): Whether to disable the progress bar. + debug (bool): Enable debug logging. """ # noqa: E501 - set_up_cli_logging() + set_up_cli_logging(logger, "folio_user_import", debug, stream_level=logging.WARNING) fields_to_protect = fields_to_protect or "" protect_fields = [f.strip() for f in fields_to_protect.split(",") if f.strip()] diff --git a/src/folio_data_import/__init__.py b/src/folio_data_import/__init__.py index a7ea867..edc3673 100644 --- a/src/folio_data_import/__init__.py +++ b/src/folio_data_import/__init__.py @@ -1,5 +1,10 @@ import importlib.metadata +import logging +import sys +from datetime import datetime as dt + import questionary +from rich.logging import RichHandler def get_folio_connection_parameters( @@ -28,4 +33,109 @@ def get_folio_connection_parameters( return gateway_url, tenant_id, username, password +# Logging setup and customizations + +# Custom log level for data issues, set to 26 +DATA_ISSUE_LVL_NUM = 26 +logging.addLevelName(DATA_ISSUE_LVL_NUM, "DATA_ISSUES") + + +class CustomLogger(logging.Logger): + """Logger subclass with custom data_issues method.""" + + def data_issues(self, msg: str, *args, **kws) -> None: + """Log data issues at custom level (26).""" + if self.isEnabledFor(DATA_ISSUE_LVL_NUM): + self._log(DATA_ISSUE_LVL_NUM, msg, args, **kws) + + +class ExcludeLevelFilter(logging.Filter): + def __init__(self, level) -> None: + super().__init__() + self.level = level + + def filter(self, record): + return record.levelno != self.level + + +class IncludeLevelFilter(logging.Filter): + def __init__(self, level) -> None: + super().__init__() + self.level = level + + def filter(self, record): + return record.levelno == self.level + + +# Set the custom logger class as the default +logging.setLoggerClass(CustomLogger) + + +def set_up_cli_logging( + logger: logging.Logger, + log_file_prefix: str, + debug: bool = False, + log_data_issues: bool = False, + stream_level: int = logging.INFO, +) -> None: + """ + This function sets up logging for the CLI. + + Parameters: + logger (logging.Logger): The logger to configure. + log_file_prefix (str): The prefix for the log file name. + debug (bool): Whether to enable debug logging. + log_data_issues (bool): Whether to enable logging of data issues. + stream_level (int): The logging level for the stream handler (default: logging.INFO). + """ + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + logger.propagate = False + + # Set up file and stream handlers + file_handler = logging.FileHandler( + "{}_{}.log".format(log_file_prefix, dt.now().strftime("%Y%m%d%H%M%S")) + ) + file_handler.setLevel(logging.DEBUG if debug else logging.INFO) + if log_data_issues: + file_handler.addFilter(ExcludeLevelFilter(DATA_ISSUE_LVL_NUM)) + # file_handler.addFilter(IncludeLevelFilter(25)) + file_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + file_handler.setFormatter(file_formatter) + logger.addHandler(file_handler) + + if not any( + isinstance(h, logging.StreamHandler) and h.stream == sys.stderr for h in logger.handlers + ): + stream_handler = RichHandler( + show_level=False, + show_time=False, + omit_repeated_times=False, + show_path=False, + ) + stream_handler.setLevel(logging.DEBUG if debug else stream_level) + if log_data_issues: + stream_handler.addFilter(ExcludeLevelFilter(DATA_ISSUE_LVL_NUM)) + stream_formatter = logging.Formatter("%(message)s") + stream_handler.setFormatter(stream_formatter) + logger.addHandler(stream_handler) + + # Set up data issues logging + if log_data_issues: + data_issues_handler = logging.FileHandler( + "data_issues_{}_{}.log".format(log_file_prefix, dt.now().strftime("%Y%m%d%H%M%S")) + ) + data_issues_handler.setLevel(26) + data_issues_handler.addFilter(IncludeLevelFilter(DATA_ISSUE_LVL_NUM)) + data_issues_formatter = logging.Formatter("%(message)s") + data_issues_handler.setFormatter(data_issues_formatter) + logger.addHandler(data_issues_handler) + + # Stop httpx from logging info messages to the console + logging.getLogger("httpx").setLevel(logging.WARNING) + + __version__ = importlib.metadata.version("folio-data-import")