From cc1bbdd61793eb9dc1b862793a0d3e2ccee68a4e Mon Sep 17 00:00:00 2001 From: Ian Later Date: Mon, 13 Apr 2026 17:31:52 -0700 Subject: [PATCH 1/9] python(feat): Test results logging now happens in a subprocess while test runs. --- .../low_level_wrappers/test_results.py | 568 ++++++++++++------ .../_tests/resources/test_test_results.py | 23 +- .../lib/sift_client/_tests/util/conftest.py | 47 +- .../resources/sync_stubs/__init__.pyi | 5 +- .../lib/sift_client/resources/test_results.py | 6 +- .../scripts/import_test_result_log.py | 65 +- .../sift_client/util/test_results/__init__.py | 11 +- .../util/test_results/context_manager.py | 67 ++- .../util/test_results/pytest_util.py | 65 +- 9 files changed, 595 insertions(+), 262 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py index 0c6499694..e9d871e4d 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py @@ -1,12 +1,13 @@ from __future__ import annotations +import fcntl import json import logging import re import uuid from dataclasses import dataclass, field from pathlib import Path -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any, Generator, cast from google.protobuf import json_format from sift.test_reports.v1.test_reports_pb2 import ( @@ -78,30 +79,6 @@ def __init__(self, grpc_client: GrpcClient): """ super().__init__(grpc_client) - @staticmethod - def _log_request_to_file( - log_file: str | Path, - request_type: str, - request: Any, - response_id: str | None = None, - ) -> None: - """Log a request to a file in JSON format. - - Args: - log_file: Path to the log file. - request_type: Type of request being logged. - request: The protobuf request to log. - response_id: Optional ID from the simulated response, embedded in the tag - for create operations so replay can map previously simulated IDs used by simulated updates. - """ - log_path = Path(log_file) - log_path.parent.mkdir(parents=True, exist_ok=True) - tag = f"{request_type}:{response_id}" if response_id else request_type - with open(log_path, "a") as f: - request_dict = json_format.MessageToDict(request) - request_json = json.dumps(request_dict, separators=(",", ":")) - f.write(f"[{tag}] {request_json}\n") - @staticmethod def simulate_create_test_report_response( request: CreateTestReportRequest, @@ -366,21 +343,28 @@ async def import_test_report(self, remote_file_id: str) -> TestReport: async def create_test_report( self, *, - test_report: TestReportCreate, + test_report: TestReportCreate | None = None, log_file: str | Path | None = None, + request: CreateTestReportRequest | None = None, + simulate: bool = False, ) -> TestReport: """Create a new test report. Args: test_report: The test report to create. log_file: If set, log the request to this file and return a simulated response. + request: Raw protobuf request (mutually exclusive with test_report). + simulate: If True, return a simulated response without making an API call. Returns: The created TestReport. """ - request = test_report.to_proto() + if request is None: + if test_report is None: + raise ValueError("Either test_report or request must be provided") + request = test_report.to_proto() - if log_file is not None: + if log_file is not None or simulate: simulated_proto = self.simulate_create_test_report_response(request) if log_file is not None: self._log_request_to_file( @@ -479,9 +463,11 @@ async def list_all_test_reports( async def update_test_report( self, - update: TestReportUpdate, + update: TestReportUpdate | None = None, log_file: str | Path | None = None, existing: TestReport | None = None, + request: UpdateTestReportRequest | None = None, + simulate: bool = False, ) -> TestReport: """Update an existing test report. @@ -490,14 +476,19 @@ async def update_test_report( log_file: If set, log the request to this file and return a simulated response. existing: The full existing TestReport for simulation merge. If not provided, the simulated response will only contain the updated fields. + request: Raw protobuf request (mutually exclusive with update). + simulate: If True, return a simulated response without making an API call. Returns: The updated TestReport. """ - test_report_proto, field_mask = update.to_proto_with_mask() - request = UpdateTestReportRequest(test_report=test_report_proto, update_mask=field_mask) + if request is None: + if update is None: + raise ValueError("Either update or request must be provided") + test_report_proto, field_mask = update.to_proto_with_mask() + request = UpdateTestReportRequest(test_report=test_report_proto, update_mask=field_mask) - if log_file is not None: + if log_file is not None or simulate: if log_file is not None: self._log_request_to_file(log_file, "UpdateTestReport", request) return self.simulate_update_test_report_response(request, existing=existing) @@ -525,21 +516,28 @@ async def delete_test_report(self, test_report_id: str) -> None: async def create_test_step( self, - test_step: TestStepCreate, + test_step: TestStepCreate | None = None, log_file: str | Path | None = None, + request: CreateTestStepRequest | None = None, + simulate: bool = False, ) -> TestStep: """Create a new test step. Args: test_step: The test step to create. log_file: If set, log the request to this file and return a simulated response. + request: Raw protobuf request (mutually exclusive with test_step). + simulate: If True, return a simulated response without making an API call. Returns: The created TestStep. """ - request = CreateTestStepRequest(test_step=test_step.to_proto()) + if request is None: + if test_step is None: + raise ValueError("Either test_step or request must be provided") + request = CreateTestStepRequest(test_step=test_step.to_proto()) - if log_file is not None: + if log_file is not None or simulate: simulated_proto = self.simulate_create_test_step_response(request) if log_file is not None: self._log_request_to_file( @@ -618,9 +616,11 @@ async def list_all_test_steps( async def update_test_step( self, - update: TestStepUpdate, + update: TestStepUpdate | None = None, log_file: str | Path | None = None, existing: TestStep | None = None, + request: UpdateTestStepRequest | None = None, + simulate: bool = False, ) -> TestStep: """Update an existing test step. @@ -629,17 +629,22 @@ async def update_test_step( log_file: If set, log the request to this file and return a simulated response. existing: The full existing TestStep for simulation merge. If not provided, the simulated response will only contain the updated fields. + request: Raw protobuf request (mutually exclusive with update). + simulate: If True, return a simulated response without making an API call. Returns: The updated TestStep. """ - test_step_proto, field_mask = update.to_proto_with_mask() - has_error_info = test_step_proto.HasField("error_info") - if has_error_info: - field_mask.paths.append("error_info") - request = UpdateTestStepRequest(test_step=test_step_proto, update_mask=field_mask) - - if log_file is not None: + if request is None: + if update is None: + raise ValueError("Either update or request must be provided") + test_step_proto, field_mask = update.to_proto_with_mask() + has_error_info = test_step_proto.HasField("error_info") + if has_error_info: + field_mask.paths.append("error_info") + request = UpdateTestStepRequest(test_step=test_step_proto, update_mask=field_mask) + + if log_file is not None or simulate: if log_file is not None: self._log_request_to_file(log_file, "UpdateTestStep", request) return self.simulate_update_test_step_response(request, existing=existing) @@ -667,21 +672,28 @@ async def delete_test_step(self, test_step_id: str) -> None: async def create_test_measurement( self, - test_measurement: TestMeasurementCreate, + test_measurement: TestMeasurementCreate | None = None, log_file: str | Path | None = None, + request: CreateTestMeasurementRequest | None = None, + simulate: bool = False, ) -> TestMeasurement: """Create a new test measurement. Args: test_measurement: The test measurement to create. log_file: If set, log the request to this file and return a simulated response. + request: Raw protobuf request (mutually exclusive with test_measurement). + simulate: If True, return a simulated response without making an API call. Returns: The created TestMeasurement. """ - request = CreateTestMeasurementRequest(test_measurement=test_measurement.to_proto()) + if request is None: + if test_measurement is None: + raise ValueError("Either test_measurement or request must be provided") + request = CreateTestMeasurementRequest(test_measurement=test_measurement.to_proto()) - if log_file is not None: + if log_file is not None or simulate: simulated_proto = self.simulate_create_test_measurement_response(request) if log_file is not None: self._log_request_to_file( @@ -700,22 +712,29 @@ async def create_test_measurement( async def create_test_measurements( self, - test_measurements: list[TestMeasurementCreate], + test_measurements: list[TestMeasurementCreate] | None = None, log_file: str | Path | None = None, + request: CreateTestMeasurementsRequest | None = None, + simulate: bool = False, ) -> tuple[int, list[str]]: """Create multiple test measurements in a single request. Args: test_measurements: The test measurements to create. log_file: If set, log the request to this file and return a simulated response. + request: Raw protobuf request (mutually exclusive with test_measurements). + simulate: If True, return a simulated response without making an API call. Returns: A tuple of (measurements_created_count, measurement_ids). """ - measurement_protos = [tm.to_proto() for tm in test_measurements] - request = CreateTestMeasurementsRequest(test_measurements=measurement_protos) + if request is None: + if test_measurements is None: + raise ValueError("Either test_measurements or request must be provided") + measurement_protos = [tm.to_proto() for tm in test_measurements] + request = CreateTestMeasurementsRequest(test_measurements=measurement_protos) - if log_file is not None: + if log_file is not None or simulate: count, measurement_ids = self.simulate_create_test_measurements_response(request) if log_file is not None: self._log_request_to_file( @@ -798,9 +817,11 @@ async def list_all_test_measurements( async def update_test_measurement( self, - update: TestMeasurementUpdate, + update: TestMeasurementUpdate | None = None, log_file: str | Path | None = None, existing: TestMeasurement | None = None, + request: UpdateTestMeasurementRequest | None = None, + simulate: bool = False, ) -> TestMeasurement: """Update an existing test measurement. @@ -809,16 +830,21 @@ async def update_test_measurement( log_file: If set, log the request to this file and return a simulated response. existing: The full existing TestMeasurement for simulation merge. If not provided, the simulated response will only contain the updated fields. + request: Raw protobuf request (mutually exclusive with update). + simulate: If True, return a simulated response without making an API call. Returns: The updated TestMeasurement. """ - test_measurement_proto, field_mask = update.to_proto_with_mask() - request = UpdateTestMeasurementRequest( - test_measurement=test_measurement_proto, update_mask=field_mask - ) + if request is None: + if update is None: + raise ValueError("Either update or request must be provided") + test_measurement_proto, field_mask = update.to_proto_with_mask() + request = UpdateTestMeasurementRequest( + test_measurement=test_measurement_proto, update_mask=field_mask + ) - if log_file is not None: + if log_file is not None or simulate: if log_file is not None: self._log_request_to_file(log_file, "UpdateTestMeasurement", request) return self.simulate_update_test_measurement_response(request, existing=existing) @@ -847,15 +873,23 @@ async def delete_test_measurement(self, measurement_id: str) -> None: async def import_log_file( self, log_file: str | Path, + incremental: bool = False, ) -> ReplayResult: - """Replay a log file by parsing each entry, simulating the results, then creating for real. + """Replay a log file, creating real API objects from the logged simulation data. - This method reads a log file created by the simulation logging, reconstructs - all the objects via simulation, and then creates them via the actual API. - IDs are mapped from simulated to real during the creation process. + Two modes are available: + + * **batch** (default): Parse the entire log, reconstruct objects via + simulation, then create them all via the API in one pass. The + ``LogTracking`` header on line 0 is ignored. + * **incremental** (``incremental=True``): Walk the log line-by-line, + issuing the real API call for each entry as it is encountered. + ``LogTracking.last_uploaded_line`` is updated after every successful + call so that a subsequent invocation skips already-uploaded lines. Args: log_file: Path to the log file to replay. + incremental: If True, use incremental mode. Returns: A ReplayResult containing the created report, steps, and measurements. @@ -864,123 +898,168 @@ async def import_log_file( if not log_path.exists(): raise FileNotFoundError(f"Log file not found: {log_file}") - simulated_report: TestReport | None = None - simulated_steps_by_id: dict[str, TestStep] = {} - simulated_steps_order: list[str] = [] - simulated_measurements_by_id: dict[str, TestMeasurement] = {} - simulated_measurements_order: list[str] = [] + if incremental: + return await self._incremental_import_log_file(log_path) - line_pattern = re.compile(r"^\[(\w+)(?::([^\]]+))?\]\s*(.+)$") + return await self._batch_import_log_file(log_path) - # Parse the log file and simulate the responses (without calling the API). - with open(log_path) as f: - for line in f: - line = line.strip() - if not line: - continue - - match = line_pattern.match(line) - if not match: - raise ValueError(f"Skipping malformed log line: {line[:100]}...") - - request_type = match.group(1) - response_id = match.group(2) - json_str = match.group(3) - - if request_type == "CreateTestReport": - create_report_req = CreateTestReportRequest() - json_format.Parse(json_str, create_report_req) - report_proto = self.simulate_create_test_report_response(create_report_req) - if response_id: - report_proto.test_report_id = response_id - simulated_report = TestReport._from_proto(report_proto) - - elif request_type == "CreateTestStep": - create_step_req = CreateTestStepRequest() - json_format.Parse(json_str, create_step_req) - step_proto = self.simulate_create_test_step_response(create_step_req) - if response_id: - step_proto.test_step_id = response_id - step = TestStep._from_proto(step_proto) - simulated_steps_by_id[step._id_or_error] = step - simulated_steps_order.append(step._id_or_error) - - elif request_type == "CreateTestMeasurement": - create_meas_req = CreateTestMeasurementRequest() - json_format.Parse(json_str, create_meas_req) - meas_proto = self.simulate_create_test_measurement_response(create_meas_req) - if response_id: - meas_proto.measurement_id = response_id - measurement = TestMeasurement._from_proto(meas_proto) - simulated_measurements_by_id[measurement._id_or_error] = measurement - simulated_measurements_order.append(measurement._id_or_error) - - elif request_type == "CreateTestMeasurements": - create_batch_req = CreateTestMeasurementsRequest() - json_format.Parse(json_str, create_batch_req) - original_ids = response_id.split(",") if response_id else [] - for i, tm_proto in enumerate(create_batch_req.test_measurements): - single_request = CreateTestMeasurementRequest(test_measurement=tm_proto) - batch_meas_proto = self.simulate_create_test_measurement_response( - single_request - ) - if i < len(original_ids): - batch_meas_proto.measurement_id = original_ids[i] - measurement = TestMeasurement._from_proto(batch_meas_proto) - simulated_measurements_by_id[measurement._id_or_error] = measurement - simulated_measurements_order.append(measurement._id_or_error) - - elif request_type == "UpdateTestReport": - if simulated_report is None: - raise ValueError("UpdateTestReport found before CreateTestReport") - update_report_req = UpdateTestReportRequest() - json_format.Parse(json_str, update_report_req) - simulated_report = self.simulate_update_test_report_response( - update_report_req, existing=simulated_report - ) - - elif request_type == "UpdateTestStep": - update_step_req = UpdateTestStepRequest() - json_format.Parse(json_str, update_step_req) - step_id = update_step_req.test_step.test_step_id - if step_id not in simulated_steps_by_id: - raise ValueError(f"UpdateTestStep for unknown step: {step_id}") - simulated_steps_by_id[step_id] = self.simulate_update_test_step_response( - update_step_req, existing=simulated_steps_by_id[step_id] - ) - - elif request_type == "UpdateTestMeasurement": - update_meas_req = UpdateTestMeasurementRequest() - json_format.Parse(json_str, update_meas_req) - measurement_id = update_meas_req.test_measurement.measurement_id - if measurement_id not in simulated_measurements_by_id: - raise ValueError( - f"UpdateTestMeasurement for unknown measurement: {measurement_id}" - ) - simulated_measurements_by_id[measurement_id] = ( - self.simulate_update_test_measurement_response( - update_meas_req, - existing=simulated_measurements_by_id[measurement_id], - ) - ) - - else: - logger.warning(f"Unknown request type: {request_type}") - - # Send the test report to the server, making sure to update the IDs to real ones as we go. - if simulated_report is None: + # ------------------------------------------------------------------ + # Shared replay dispatch + # ------------------------------------------------------------------ + + async def _import_entry( + self, + request_type: str, + response_id: str | None, + json_str: str, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + """Process a single log entry, updating *state* in place. + + When *simulate* is True the create/update methods return simulated + responses (no network call). When False they issue real gRPC calls. + *id_map* is updated so that subsequent entries can remap IDs that + were generated during the original simulation run. + """ + + def _map_id(sid: str) -> str: + return id_map.get(sid, sid) + + if request_type == "CreateTestReport": + create_report_req = CreateTestReportRequest() + json_format.Parse(json_str, create_report_req) + state.report = await self.create_test_report( + request=create_report_req, simulate=simulate + ) + if response_id: + id_map[response_id] = state.report._id_or_error + + elif request_type == "CreateTestStep": + create_step_req = CreateTestStepRequest() + json_format.Parse(json_str, create_step_req) + create_step_req.test_step.test_report_id = _map_id( + create_step_req.test_step.test_report_id + ) + if create_step_req.test_step.parent_step_id: + create_step_req.test_step.parent_step_id = _map_id( + create_step_req.test_step.parent_step_id + ) + step = await self.create_test_step(request=create_step_req, simulate=simulate) + if response_id: + id_map[response_id] = step._id_or_error + state.steps_by_id[step._id_or_error] = step + state.steps_order.append(step._id_or_error) + + elif request_type == "CreateTestMeasurement": + create_meas_req = CreateTestMeasurementRequest() + json_format.Parse(json_str, create_meas_req) + create_meas_req.test_measurement.test_step_id = _map_id( + create_meas_req.test_measurement.test_step_id + ) + measurement = await self.create_test_measurement( + request=create_meas_req, simulate=simulate + ) + if response_id: + id_map[response_id] = measurement._id_or_error + state.measurements_by_id[measurement._id_or_error] = measurement + state.measurements_order.append(measurement._id_or_error) + + elif request_type == "CreateTestMeasurements": + create_batch_req = CreateTestMeasurementsRequest() + json_format.Parse(json_str, create_batch_req) + for tm in create_batch_req.test_measurements: + tm.test_step_id = _map_id(tm.test_step_id) + original_ids = response_id.split(",") if response_id else [] + if simulate: + for i, tm_proto in enumerate(create_batch_req.test_measurements): + single_req = CreateTestMeasurementRequest(test_measurement=tm_proto) + meas = await self.create_test_measurement(request=single_req, simulate=True) + if i < len(original_ids): + id_map[original_ids[i]] = meas._id_or_error + state.measurements_by_id[meas._id_or_error] = meas + state.measurements_order.append(meas._id_or_error) + else: + _, real_ids = await self.create_test_measurements(request=create_batch_req) + for i, real_id in enumerate(real_ids): + if i < len(original_ids): + id_map[original_ids[i]] = real_id + + elif request_type == "UpdateTestReport": + if state.report is None: + raise ValueError("UpdateTestReport found before CreateTestReport") + update_report_req = UpdateTestReportRequest() + json_format.Parse(json_str, update_report_req) + update_report_req.test_report.test_report_id = _map_id( + update_report_req.test_report.test_report_id + ) + state.report = await self.update_test_report( + request=update_report_req, simulate=simulate, existing=state.report + ) + + elif request_type == "UpdateTestStep": + update_step_req = UpdateTestStepRequest() + json_format.Parse(json_str, update_step_req) + orig_step_id = update_step_req.test_step.test_step_id + mapped_step_id = _map_id(orig_step_id) + update_step_req.test_step.test_step_id = mapped_step_id + existing_step = state.steps_by_id.get(mapped_step_id) + if simulate and existing_step is None: + raise ValueError(f"UpdateTestStep for unknown step: {orig_step_id}") + updated_step = await self.update_test_step( + request=update_step_req, simulate=simulate, existing=existing_step + ) + if mapped_step_id in state.steps_by_id: + state.steps_by_id[mapped_step_id] = updated_step + + elif request_type == "UpdateTestMeasurement": + update_meas_req = UpdateTestMeasurementRequest() + json_format.Parse(json_str, update_meas_req) + orig_meas_id = update_meas_req.test_measurement.measurement_id + mapped_meas_id = _map_id(orig_meas_id) + update_meas_req.test_measurement.measurement_id = mapped_meas_id + existing_meas = state.measurements_by_id.get(mapped_meas_id) + if simulate and existing_meas is None: + raise ValueError(f"UpdateTestMeasurement for unknown measurement: {orig_meas_id}") + updated_meas = await self.update_test_measurement( + request=update_meas_req, simulate=simulate, existing=existing_meas + ) + if mapped_meas_id in state.measurements_by_id: + state.measurements_by_id[mapped_meas_id] = updated_meas + + # ------------------------------------------------------------------ + # Batch replay (default) + # ------------------------------------------------------------------ + + async def _batch_import_log_file(self, log_path: Path) -> ReplayResult: + id_map: dict[str, str] = {} + state = _ReplayState() + + for request_type, response_id, json_str in self._iter_log_data_lines(log_path): + await self._import_entry( + request_type, + response_id, + json_str, + simulate=True, + id_map=id_map, + state=state, + ) + + if state.report is None: raise ValueError("No CreateTestReport found in log file") - simulated_step_id_map: dict[str, str] = {} + real_id_map: dict[str, str] = {} - real_report = await self._create_report_from_simulated(simulated_report) + real_report = await self._create_report_from_simulated(state.report) real_report_id = real_report._id_or_error real_steps: list[TestStep] = [] - for sim_step_id in simulated_steps_order: - sim_step = simulated_steps_by_id[sim_step_id] + for sim_step_id in state.steps_order: + sim_step = state.steps_by_id[sim_step_id] real_parent_step_id = ( - simulated_step_id_map.get(sim_step.parent_step_id, sim_step.parent_step_id) + real_id_map.get(sim_step.parent_step_id, sim_step.parent_step_id) if sim_step.parent_step_id else None ) @@ -989,12 +1068,12 @@ async def import_log_file( ) real_step = await self.create_test_step(step_create) real_steps.append(real_step) - simulated_step_id_map[sim_step_id] = real_step._id_or_error + real_id_map[sim_step_id] = real_step._id_or_error real_measurements: list[TestMeasurement] = [] - for sim_measurement_id in simulated_measurements_order: - sim_measurement = simulated_measurements_by_id[sim_measurement_id] - real_step_id = simulated_step_id_map.get( + for sim_measurement_id in state.measurements_order: + sim_measurement = state.measurements_by_id[sim_measurement_id] + real_step_id = real_id_map.get( sim_measurement.test_step_id, sim_measurement.test_step_id ) measurement_create = self._measurement_create_from_simulated( @@ -1009,6 +1088,75 @@ async def import_log_file( measurements=real_measurements, ) + # ------------------------------------------------------------------ + # Incremental replay + # ------------------------------------------------------------------ + + async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult: + """Replay line-by-line, issuing real API calls and updating tracking.""" + with open(log_path) as f: + first_line = f.readline() + tracking = LogTracking.from_log_line(first_line) if first_line else LogTracking() + + id_map = tracking.id_map + state = _ReplayState() + + for line_num, (request_type, response_id, json_str) in enumerate( + self._iter_log_data_lines(log_path), start=tracking.last_uploaded_line + 1 + ): + await self._import_entry( + request_type, + response_id, + json_str, + simulate=False, + id_map=id_map, + state=state, + ) + + tracking.last_uploaded_line = line_num + self._update_tracking(log_path, tracking) + + if state.report is None: + raise ValueError("No CreateTestReport found in log file") + + return ReplayResult( + report=state.report, + steps=[state.steps_by_id[sid] for sid in state.steps_order], + measurements=[state.measurements_by_id[mid] for mid in state.measurements_order], + ) + + # ------------------------------------------------------------------ + # Log line parsing helpers + # ------------------------------------------------------------------ + + @staticmethod + def _iter_log_data_lines( + log_path: Path, + ) -> Generator[tuple[str, str | None, str], None, None]: + """Parse data lines from a log file, skipping the LogTracking header. + + Yields (request_type, response_id, json_str) tuples. + The file is read entirely under a shared lock and then released + before yielding, so callers can safely acquire exclusive locks + during iteration (e.g. ``_update_tracking``). + """ + line_pattern = re.compile(r"^\[(\w+)(?::([^\]]+))?\]\s*(.+)$") + with open(log_path) as f: + fcntl.flock(f, fcntl.LOCK_SH) + raw_lines = f.readlines() + + for raw_line in raw_lines: + line = raw_line.strip() + if not line: + continue + match = line_pattern.match(line) + if not match: + raise ValueError(f"Invalid log line: {line}") + request_type = match.group(1) + if request_type == "LogTracking": + continue + yield (request_type, match.group(2), match.group(3)) + async def _create_report_from_simulated(self, simulated: TestReport) -> TestReport: """Create a real test report from a simulated one.""" report_create = TestReportCreate( @@ -1066,6 +1214,42 @@ def _measurement_create_from_simulated( string_expected_value=simulated.string_expected_value, ) + @staticmethod + def _log_request_to_file( + log_file: str | Path, + request_type: str, + request: Any, + response_id: str | None = None, + ) -> None: + """Log a request to a file in JSON format. + + Args: + log_file: Path to the log file. + request_type: Type of request being logged. + request: The protobuf request to log. + response_id: Optional ID from the simulated response, embedded in the tag + for create operations so replay can map previously simulated IDs used by simulated updates. + """ + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + tag = f"{request_type}:{response_id}" if response_id else request_type + with open(log_path, "a") as f: + request_dict = json_format.MessageToDict(request) + request_json = json.dumps(request_dict, separators=(",", ":")) + f.write(f"[{tag}] {request_json}\n") + + @staticmethod + def _update_tracking(log_file: str | Path, tracking: LogTracking) -> None: + """Rewrite the LogTracking header (line 0) in place.""" + log_path = Path(log_file) + with open(log_path, "r+") as f: + fcntl.flock(f, fcntl.LOCK_EX) + lines = f.readlines() + lines[0] = tracking.to_log_line() + f.seek(0) + f.writelines(lines) + f.truncate() + def _client_version() -> str: from importlib.metadata import PackageNotFoundError, version @@ -1076,6 +1260,46 @@ def _client_version() -> str: return "unknown" +@dataclass +class LogTracking: + """Tracking metadata stored as line 0 of a log file.""" + + last_uploaded_line: int = 0 + id_map: dict[str, str] = field(default_factory=dict) + client_version: str = field(default_factory=_client_version) + + def to_log_line(self) -> str: + data = { + "clientVersion": self.client_version, + "lastUploadedLine": self.last_uploaded_line, + "idMap": self.id_map, + } + return f"[LogTracking] {json.dumps(data, separators=(',', ':'))}\n" + + @staticmethod + def from_log_line(line: str) -> LogTracking: + match = re.match(r"^\[LogTracking\]\s*(.+)$", line.strip()) + if not match: + return LogTracking() + data = json.loads(match.group(1)) + return LogTracking( + last_uploaded_line=data.get("lastUploadedLine", 0), + id_map=data.get("idMap", {}), + client_version=data.get("clientVersion", "unknown"), + ) + + +@dataclass +class _ReplayState: + """Mutable state accumulated during log replay.""" + + report: TestReport | None = None + steps_by_id: dict[str, TestStep] = field(default_factory=dict) + steps_order: list[str] = field(default_factory=list) + measurements_by_id: dict[str, TestMeasurement] = field(default_factory=dict) + measurements_order: list[str] = field(default_factory=list) + + @dataclass class ReplayResult: """Result of replaying a log file.""" diff --git a/python/lib/sift_client/_tests/resources/test_test_results.py b/python/lib/sift_client/_tests/resources/test_test_results.py index d04609724..8612665dc 100644 --- a/python/lib/sift_client/_tests/resources/test_test_results.py +++ b/python/lib/sift_client/_tests/resources/test_test_results.py @@ -697,24 +697,23 @@ def test_import_log_file_round_trip(self, sift_client, nostromo_run, tmp_path): compare_test_measurement_fields(replayed_m, direct_m) @pytest.mark.asyncio - async def test_malformed_log_line_raises(self, tmp_path): - """import_log_file raises ValueError on a line that doesn't match the expected format.""" + async def test_malformed_log_line_skipped(self, tmp_path): + """Malformed lines are skipped; a file with no valid entries raises 'No CreateTestReport'.""" log_file = tmp_path / "bad.jsonl" - log_file.write_text("this is not a valid log line\n") + log_file.write_text( + '[LogTracking] {"lastUploadedLine":0,"idMap":{}}\nthis is not a valid log line\n' + ) client = TestResultsLowLevelClient(grpc_client=MagicMock()) - with pytest.raises(ValueError, match="malformed log line"): + with pytest.raises(ValueError, match="Invalid log line: this is not a valid log lin"): await client.import_log_file(log_file) @pytest.mark.asyncio - async def test_malformed_line_after_valid_lines_raises(self, tmp_path): - """A malformed line after valid entries still raises.""" - log_file = tmp_path / "mixed.jsonl" - log_file.write_text( - '[CreateTestReport] {"name":"r","testCase":"c","testSystemName":"s"}\n' - "totally broken line\n" - ) + async def test_empty_log_file_raises(self, tmp_path): + """A log file with only a LogTracking header and no entries raises.""" + log_file = tmp_path / "empty.jsonl" + log_file.write_text('[LogTracking] {"lastUploadedLine":0,"idMap":{}}\n') client = TestResultsLowLevelClient(grpc_client=MagicMock()) - with pytest.raises(ValueError, match="malformed log line"): + with pytest.raises(ValueError, match="No CreateTestReport found"): await client.import_log_file(log_file) diff --git a/python/lib/sift_client/_tests/util/conftest.py b/python/lib/sift_client/_tests/util/conftest.py index 3d8eb07fc..c8adf5687 100644 --- a/python/lib/sift_client/_tests/util/conftest.py +++ b/python/lib/sift_client/_tests/util/conftest.py @@ -1,43 +1,14 @@ -"""Override report_context to disable log file simulation for integration tests in this directory so that we can exercise the context manager when no log file is provided.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING, Generator - import pytest -from sift_client.util.test_results.pytest_util import _report_context_impl, _step_impl - -if TYPE_CHECKING: - from sift_client.client import SiftClient - from sift_client.util.test_results.context_manager import NewStep, ReportContext - - -@pytest.fixture(scope="session", autouse=True) -def report_context( - sift_client: SiftClient, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[ReportContext | None, None, None]: - if client_has_connection: - yield from _report_context_impl(sift_client, request, log_file=None) - else: - yield None - -@pytest.fixture(autouse=True) -def step( - report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[NewStep | None, None, None]: - if client_has_connection: - yield from _step_impl(report_context, request) - else: - yield None +def pytest_addoption(parser: pytest.Parser) -> None: + existing_options = [opt.names() for opt in parser._anonymous.options] + # Flatten the list of lists into a single list of strings + flat_options = [item for sublist in existing_options for item in sublist] + if not any("--sift-test-results-log-file" in name for name in flat_options): + parser.addoption("--sift-test-results-log-file", action="store_true", default="false") -@pytest.fixture(scope="module", autouse=True) -def module_substep( - report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[NewStep | None, None, None]: - if client_has_connection: - yield from _step_impl(report_context, request) - else: - yield None +def pytest_configure(config: pytest.Config) -> None: + """Configure the pytest configuration to disable the Sift test results log file.""" + config.option.sift_test_results_log_file = False diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 62fe9d87a..241047cfa 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -2001,7 +2001,7 @@ class TestResultsAPI: """ ... - def import_log_file(self, log_file: str | Path) -> ReplayResult: + def import_log_file(self, log_file: str | Path, incremental: bool = False) -> ReplayResult: """Replay a log file by parsing each entry, simulating the results, then creating for real. This method reads a log file created by the simulation logging, reconstructs @@ -2009,7 +2009,8 @@ class TestResultsAPI: IDs are mapped from simulated to real during the creation process. Args: - log_file: Path to the log file to replay. + log_file: Path to the log file to import. + incremental: (internal tooling) If True, goes line by line and calls every event vs. reading the entire file at once and sending resultant test report. Returns: A ReplayResult containing the created report, steps, and measurements. diff --git a/python/lib/sift_client/resources/test_results.py b/python/lib/sift_client/resources/test_results.py index d739b4400..927ab27a2 100644 --- a/python/lib/sift_client/resources/test_results.py +++ b/python/lib/sift_client/resources/test_results.py @@ -619,6 +619,7 @@ async def delete_measurement(self, *, test_measurement: str | TestMeasurement) - async def import_log_file( self, log_file: str | Path, + incremental: bool = False, ) -> ReplayResult: """Replay a log file by parsing each entry, simulating the results, then creating for real. @@ -627,12 +628,13 @@ async def import_log_file( IDs are mapped from simulated to real during the creation process. Args: - log_file: Path to the log file to replay. + log_file: Path to the log file to import. + incremental: (internal tooling) If True, goes line by line and calls every event vs. reading the entire file at once and sending resultant test report. Returns: A ReplayResult containing the created report, steps, and measurements. """ - result = await self._low_level_client.import_log_file(log_file) + result = await self._low_level_client.import_log_file(log_file, incremental=incremental) result.report = self._apply_client_to_instance(result.report) result.steps = self._apply_client_to_instances(result.steps) result.measurements = self._apply_client_to_instances(result.measurements) diff --git a/python/lib/sift_client/scripts/import_test_result_log.py b/python/lib/sift_client/scripts/import_test_result_log.py index 7b01f4d29..78bf6e832 100644 --- a/python/lib/sift_client/scripts/import_test_result_log.py +++ b/python/lib/sift_client/scripts/import_test_result_log.py @@ -3,10 +3,45 @@ from __future__ import annotations import argparse +import logging import os +import select +import sys +import tempfile +from typing import TYPE_CHECKING from sift_client import SiftClient, SiftConnectionConfig +if TYPE_CHECKING: + from sift_client._internal.low_level_wrappers.test_results import ReplayResult + +logger = logging.getLogger(__name__) + + +def _print_result(result: ReplayResult) -> None: + print(f"Report: {result.report.name} (id={result.report.id_})") + print(f"Steps: {len(result.steps)}") + for step in result.steps: + print(f" - {step.step_path} [{step.status}]") + print(f"Measurements: {len(result.measurements)}") + for m in result.measurements: + print(f" - {m.name}: passed={m.passed}") + + +def _incremental_import_loop(client: SiftClient, log_file: str) -> ReplayResult | None: + """Replay incrementally in a loop until stdin is closed (EOF).""" + result = None + while True: + received_signal, _, _ = select.select([sys.stdin], [], [], 1.0) + result = client.test_results.import_log_file(log_file, incremental=True) + if received_signal: + break + logger.info(f"Replay completed: {result}") + fp = os.path.abspath(log_file) + if fp.startswith(tempfile.gettempdir()): + os.remove(fp) + return result + def main() -> None: """Replay a test result simulation log file against the Sift API.""" @@ -17,6 +52,9 @@ def main() -> None: parser.add_argument("--grpc-url", default=os.getenv("SIFT_GRPC_URI")) parser.add_argument("--rest-url", default=os.getenv("SIFT_REST_URI")) parser.add_argument("--api-key", default=os.getenv("SIFT_API_KEY")) + parser.add_argument( + "--incremental", action="store_true", help="Import the log file incrementally." + ) args = parser.parse_args() if not args.grpc_url or not args.rest_url or not args.api_key: @@ -33,15 +71,26 @@ def main() -> None: ) ) - result = client.test_results.import_log_file(args.log_file) + try: + if args.incremental: + result = _incremental_import_loop(client, args.log_file) + else: + result = client.test_results.import_log_file(args.log_file) + fp = os.path.abspath(args.log_file) + if fp.startswith(tempfile.gettempdir()): + os.remove(fp) + if result: + _print_result(result) + except Exception as e: + logger.error(e) + logger.error( + f"Error replaying log file: {args.log_file}.\n" + f" Can replay with `replay-test-result-log {args.log_file}`." + ) + raise - print(f"Report: {result.report.name} (id={result.report.id_})") - print(f"Steps: {len(result.steps)}") - for step in result.steps: - print(f" - {step.step_path} [{step.status}]") - print(f"Measurements: {len(result.measurements)}") - for m in result.measurements: - print(f" - {m.name}: passed={m.passed}") + if result: + _print_result(result) if __name__ == "__main__": diff --git a/python/lib/sift_client/util/test_results/__init__.py b/python/lib/sift_client/util/test_results/__init__.py index 6f80ce382..8af62e351 100644 --- a/python/lib/sift_client/util/test_results/__init__.py +++ b/python/lib/sift_client/util/test_results/__init__.py @@ -58,7 +58,12 @@ def main(self): - If you want each module(file) to be marked as a step w/ each test as a substep, import the `module_substep` fixture as well. - The `report_context` fixture requires a fixture `sift_client` returning an `SiftClient` instance to be passed in. -Note: FedRAMP users: report_context will log test results to a temp file to avoid API calls during test execution. If this is a shared environment, you should import the `report_context_no_logging` fixture instead. +Note: FedRAMP users: report_context will log test results to a temp file to avoid API calls during test execution. If this is a shared environment, you can disable logging by passing ``--sift-test-results-log-file=false``. + +#### Configuration + +- Git metadata: You can configure the test results by passing the `--sift-test-results-git-metadata` flag to pytest in your commandline, conftest.py file, or as an addopt in your pyproject.toml file (https://docs.pytest.org/en/stable/reference/customize.html#configuration). +- Log file: You can configure the log file by passing the `--sift-test-results-log-file` flag to pytest in your commandline, conftest.py file, or as an addopt in your pyproject.toml file. ###### Example at top of your test file or in your conftest.py file: @@ -101,10 +106,10 @@ def test_example(report_context, step): client_has_connection, module_substep, module_substep_check_connection, + pytest_addoption, pytest_runtest_makereport, report_context, report_context_check_connection, - report_context_no_logging, step, step_check_connection, ) @@ -115,10 +120,10 @@ def test_example(report_context, step): "client_has_connection", "module_substep", "module_substep_check_connection", + "pytest_addoption", "pytest_runtest_makereport", "report_context", "report_context_check_connection", - "report_context_no_logging", "step", "step_check_connection", ] diff --git a/python/lib/sift_client/util/test_results/context_manager.py b/python/lib/sift_client/util/test_results/context_manager.py index 05e6fc3d4..d7439e03f 100644 --- a/python/lib/sift_client/util/test_results/context_manager.py +++ b/python/lib/sift_client/util/test_results/context_manager.py @@ -4,6 +4,7 @@ import logging import os import socket +import subprocess import tempfile import traceback from contextlib import AbstractContextManager @@ -37,6 +38,29 @@ logger = logging.getLogger(__name__) +def _git_metadata() -> dict[str, str] | None: + """Return git branch and commit hash, or None if not in a git repo.""" + try: + branch = subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + commit = subprocess.check_output( + ["git", "rev-parse", "--short", "HEAD"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + repo = subprocess.check_output( + ["git", "remote", "get-url", "origin"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + return {"git_repo": repo, "git_branch": branch, "git_commit": commit} + except Exception: + return None + + class ReportContext(AbstractContextManager): """Context manager for a new TestReport. See usage example in __init__.py.""" @@ -48,6 +72,7 @@ class ReportContext(AbstractContextManager): step_number_at_depth: dict[int, int] open_step_results: dict[str, bool] any_failures: bool + _import_proc: subprocess.Popen | None = None def __init__( self, @@ -57,6 +82,7 @@ def __init__( system_operator: str | None = None, test_case: str | None = None, log_file: str | Path | bool | None = None, + include_git_metadata: bool = False, ): """Initialize a new report context. @@ -68,6 +94,7 @@ def __init__( test_case: The name of the test case. Will default to the basename of the file containing the test if not provided. log_file: If True, create a temp log file. If a path, use that path. All create/update operations will be logged to this file. + include_git_metadata: If True, include git metadata in the report. """ self.client = client self.step_is_open = False @@ -97,10 +124,31 @@ def __init__( end_time=datetime.now(timezone.utc), status=TestStatus.IN_PROGRESS, system_operator=system_operator, + metadata=_git_metadata() if include_git_metadata else None, # type: ignore ) self.report = client.test_results.create(create, log_file=self.log_file) + def _open_import_proc(self): + """Open a subprocess to import the log file.""" + # To avoid GRPC forking errors, temporarily redirect stderr at the fd level before forking, so the child inherits /dev/null on fd 2 when the atfork handler fires. + saved_stderr = os.dup(2) + devnull_fd = os.open(os.devnull, os.O_WRONLY) + os.dup2(devnull_fd, 2) + os.close(devnull_fd) + try: + self._import_proc = subprocess.Popen( + ["import-test-result-log", "--incremental", str(self.log_file)], + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + finally: + os.dup2(saved_stderr, 2) + os.close(saved_stderr) + def __enter__(self): + if self.log_file: + self._open_import_proc() return self def __exit__(self, exc_type, exc_value, traceback): @@ -112,19 +160,14 @@ def __exit__(self, exc_type, exc_value, traceback): else: update["status"] = TestStatus.PASSED self.report.update(update, log_file=self.log_file) - if self.log_file: + + if self._import_proc is not None: try: - # Try replaying the log file and clean up the file if it's a temporary file. - self.client.test_results.import_log_file(self.log_file) - fp = os.path.abspath(self.log_file) - tmp_dir = tempfile.gettempdir() - if fp.startswith(tmp_dir): - os.remove(fp) - except Exception as e: - logger.error(e) - logger.error( - f"Error replaying log file: {self.log_file}.\n Can replay with `import-test-result-log {self.log_file}`." - ) + self._import_proc.communicate(timeout=10) + except subprocess.TimeoutExpired: + logger.error("Import process did not exit in 10s, killing it") + self._import_proc.kill() + self._import_proc.wait() raise return True diff --git a/python/lib/sift_client/util/test_results/pytest_util.py b/python/lib/sift_client/util/test_results/pytest_util.py index afd6e34b7..645b6c047 100644 --- a/python/lib/sift_client/util/test_results/pytest_util.py +++ b/python/lib/sift_client/util/test_results/pytest_util.py @@ -16,6 +16,39 @@ REPORT_CONTEXT: ReportContext | None = None +def pytest_addoption(parser: pytest.Parser) -> None: + """Register Sift-specific command-line options.""" + parser.addoption( + "--sift-test-results-log-file", + default=None, + help="Path to write the Sift test result log file. " + "Use 'true' (default) to auto-create a temp file, " + "False, 'false', or 'none' to disable logging, " + "or a file path to write to a specific location.", + ) + parser.addoption( + "--sift-test-results-git-metadata", + action="store_true", + default=True, + help="Include git metadata in the Sift test results.", + ) + + +def _resolve_log_file(pytestconfig: pytest.Config | None) -> str | Path | bool | None: + """Determine log_file value from --sift-test-results-log-file option.""" + raw = None + if pytestconfig is not None: + raw = pytestconfig.getoption("--sift-test-results-log-file", default=None) + if raw is None: + return True + lower = str(raw).lower() + if lower in ("true", "1"): + return True + if lower in ("false", "none"): + return None + return Path(raw) + + @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo[Any]): """You should import this hook to capture any AssertionErrors that occur during the test. If not included, any assert failures in a test will not automatically fail the step.""" @@ -34,7 +67,7 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo[Any]): def _report_context_impl( sift_client: SiftClient, request: pytest.FixtureRequest, - log_file: str | Path | bool | None = True, + pytestconfig: pytest.Config | None = None, ) -> Generator[ReportContext | None, None, None]: test_path = Path(request.config.invocation_params.args[0]) base_name = ( @@ -43,11 +76,18 @@ def _report_context_impl( else "pytest " + " ".join(request.config.invocation_params.args) ) test_case = test_path if test_path.exists() else base_name + log_file = _resolve_log_file(pytestconfig) + include_git_metadata = ( + bool(pytestconfig.getoption("--sift-test-results-git-metadata", default=False)) + if pytestconfig + else False + ) with ReportContext( sift_client, name=f"{base_name} {datetime.now(timezone.utc).isoformat()}", test_case=str(test_case), log_file=log_file, + include_git_metadata=include_git_metadata, ) as context: # Set a global so we can access this in pytest hooks. global REPORT_CONTEXT @@ -57,18 +97,14 @@ def _report_context_impl( @pytest.fixture(scope="session", autouse=True) def report_context( - sift_client: SiftClient, request: pytest.FixtureRequest + sift_client: SiftClient, request: pytest.FixtureRequest, pytestconfig: pytest.Config ) -> Generator[ReportContext | None, None, None]: - """Create a report context for the session.""" - yield from _report_context_impl(sift_client, request, log_file=True) + """Create a report context for the session. - -@pytest.fixture(scope="session", autouse=True) -def report_context_no_logging( - sift_client: SiftClient, request: pytest.FixtureRequest -) -> Generator[ReportContext | None, None, None]: - """Create a report context for the session with logging disabled.""" - yield from _report_context_impl(sift_client, request, log_file=None) + The log file destination is controlled by ``--sift-test-results-log-file``. + Defaults to a temp file when not set. + """ + yield from _report_context_impl(sift_client, request, pytestconfig=pytestconfig) def _step_impl( @@ -127,11 +163,14 @@ def client_has_connection(sift_client): @pytest.fixture(scope="session", autouse=True) def report_context_check_connection( - sift_client: SiftClient, client_has_connection: bool, request: pytest.FixtureRequest + sift_client: SiftClient, + client_has_connection: bool, + request: pytest.FixtureRequest, + pytestconfig: pytest.Config, ) -> Generator[ReportContext | None, None, None]: """Create a report context for the session. Doesn't run if the client has no connection to the Sift server.""" if client_has_connection: - yield from _report_context_impl(sift_client, request) + yield from _report_context_impl(sift_client, request, pytestconfig=pytestconfig) else: yield None From 65394c36b2c445f8c69c01215422ce3be3b6ecb4 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Tue, 14 Apr 2026 16:37:24 -0700 Subject: [PATCH 2/9] config notes --- python/lib/sift_client/_tests/conftest.py | 1 + python/lib/sift_client/util/test_results/__init__.py | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/python/lib/sift_client/_tests/conftest.py b/python/lib/sift_client/_tests/conftest.py index 1d44bb437..e50b08512 100644 --- a/python/lib/sift_client/_tests/conftest.py +++ b/python/lib/sift_client/_tests/conftest.py @@ -79,6 +79,7 @@ def ci_pytest_tag(sift_client): from sift_client.util.test_results import ( + pytest_addoption, # noqa: F401 client_has_connection, # noqa: F401 pytest_runtest_makereport, # noqa: F401 ) diff --git a/python/lib/sift_client/util/test_results/__init__.py b/python/lib/sift_client/util/test_results/__init__.py index 8af62e351..ca953a4c6 100644 --- a/python/lib/sift_client/util/test_results/__init__.py +++ b/python/lib/sift_client/util/test_results/__init__.py @@ -62,8 +62,10 @@ def main(self): #### Configuration -- Git metadata: You can configure the test results by passing the `--sift-test-results-git-metadata` flag to pytest in your commandline, conftest.py file, or as an addopt in your pyproject.toml file (https://docs.pytest.org/en/stable/reference/customize.html#configuration). -- Log file: You can configure the log file by passing the `--sift-test-results-log-file` flag to pytest in your commandline, conftest.py file, or as an addopt in your pyproject.toml file. +Import the `pytest_addoption` function to add configuration options for Test Results or add the options to your pyproject.toml file (https://docs.pytest.org/en/stable/reference/customize.html#configuration). + +- Git metadata: Include git metadata(repo, branch, commit) in the test results. Default is False. You can configure the test results by passing the `--sift-test-results-git-metadata` . +- Log file: Write test results to a file. This happens automatically but you can configure specify a specific log file by passing `--sift-test-results-log-file=` or disable logging by passing `--sift-test-results-log-file=false`. ###### Example at top of your test file or in your conftest.py file: @@ -80,7 +82,7 @@ def sift_client() -> SiftClient: return client -from sift_client.util.test_results import pytest_runtest_makereport, report_context, step, module_substep +from sift_client.util.test_results import pytest_addoption, pytest_runtest_makereport, report_context, step, module_substep ``` ###### Then in your test file: From fe8eff439cd87990963a8bd690b365d7a90e6465 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Wed, 15 Apr 2026 10:05:40 -0700 Subject: [PATCH 3/9] lint --- python/lib/sift_client/_tests/conftest.py | 2 +- python/lib/sift_client/util/test_results/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lib/sift_client/_tests/conftest.py b/python/lib/sift_client/_tests/conftest.py index e50b08512..cf71728d3 100644 --- a/python/lib/sift_client/_tests/conftest.py +++ b/python/lib/sift_client/_tests/conftest.py @@ -79,8 +79,8 @@ def ci_pytest_tag(sift_client): from sift_client.util.test_results import ( - pytest_addoption, # noqa: F401 client_has_connection, # noqa: F401 + pytest_addoption, # noqa: F401 pytest_runtest_makereport, # noqa: F401 ) from sift_client.util.test_results import ( diff --git a/python/lib/sift_client/util/test_results/__init__.py b/python/lib/sift_client/util/test_results/__init__.py index ca953a4c6..ae9a04430 100644 --- a/python/lib/sift_client/util/test_results/__init__.py +++ b/python/lib/sift_client/util/test_results/__init__.py @@ -62,7 +62,7 @@ def main(self): #### Configuration -Import the `pytest_addoption` function to add configuration options for Test Results or add the options to your pyproject.toml file (https://docs.pytest.org/en/stable/reference/customize.html#configuration). +Import the `pytest_addoption` function to add configuration options for Test Results to the commandline or add the options to your pyproject.toml file (https://docs.pytest.org/en/stable/reference/customize.html#configuration). If ommitted, will use the default values described below. - Git metadata: Include git metadata(repo, branch, commit) in the test results. Default is False. You can configure the test results by passing the `--sift-test-results-git-metadata` . - Log file: Write test results to a file. This happens automatically but you can configure specify a specific log file by passing `--sift-test-results-log-file=` or disable logging by passing `--sift-test-results-log-file=false`. From bcfc855bffcb9ccae69fc1d8cd2aa3f209d3fd37 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Thu, 16 Apr 2026 10:43:19 -0700 Subject: [PATCH 4/9] simpler import example --- python/lib/sift_client/_tests/conftest.py | 16 ++-------------- .../sift_client/util/test_results/__init__.py | 2 +- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/python/lib/sift_client/_tests/conftest.py b/python/lib/sift_client/_tests/conftest.py index cf71728d3..06c41e711 100644 --- a/python/lib/sift_client/_tests/conftest.py +++ b/python/lib/sift_client/_tests/conftest.py @@ -78,17 +78,5 @@ def ci_pytest_tag(sift_client): return tag -from sift_client.util.test_results import ( - client_has_connection, # noqa: F401 - pytest_addoption, # noqa: F401 - pytest_runtest_makereport, # noqa: F401 -) -from sift_client.util.test_results import ( - module_substep_check_connection as module_substep, # noqa: F401 -) -from sift_client.util.test_results import ( - report_context_check_connection as report_context, # noqa: F401 -) -from sift_client.util.test_results import ( - step_check_connection as step, # noqa: F401 -) +# Import all test results fixtures the way we expect customers to do. +from sift_client.util.test_results import * # noqa: F403 \ No newline at end of file diff --git a/python/lib/sift_client/util/test_results/__init__.py b/python/lib/sift_client/util/test_results/__init__.py index ae9a04430..839de0617 100644 --- a/python/lib/sift_client/util/test_results/__init__.py +++ b/python/lib/sift_client/util/test_results/__init__.py @@ -82,7 +82,7 @@ def sift_client() -> SiftClient: return client -from sift_client.util.test_results import pytest_addoption, pytest_runtest_makereport, report_context, step, module_substep +from sift_client.util.test_results import * ``` ###### Then in your test file: From d6d724e617eb533e2274e9deb7fb27ae8536594d Mon Sep 17 00:00:00 2001 From: Ian Later Date: Thu, 16 Apr 2026 13:43:37 -0700 Subject: [PATCH 5/9] lint --- python/lib/sift_client/_tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/sift_client/_tests/conftest.py b/python/lib/sift_client/_tests/conftest.py index 06c41e711..eda17e1c1 100644 --- a/python/lib/sift_client/_tests/conftest.py +++ b/python/lib/sift_client/_tests/conftest.py @@ -79,4 +79,4 @@ def ci_pytest_tag(sift_client): # Import all test results fixtures the way we expect customers to do. -from sift_client.util.test_results import * # noqa: F403 \ No newline at end of file +from sift_client.util.test_results import * # noqa: F403 From ac537e29cc5d887163bcf909fa9d8f7e9df0fb52 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Thu, 16 Apr 2026 15:36:41 -0700 Subject: [PATCH 6/9] undo conftest change. we need check connection fixtures for unit tests --- python/lib/sift_client/_tests/conftest.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/python/lib/sift_client/_tests/conftest.py b/python/lib/sift_client/_tests/conftest.py index eda17e1c1..cf71728d3 100644 --- a/python/lib/sift_client/_tests/conftest.py +++ b/python/lib/sift_client/_tests/conftest.py @@ -78,5 +78,17 @@ def ci_pytest_tag(sift_client): return tag -# Import all test results fixtures the way we expect customers to do. -from sift_client.util.test_results import * # noqa: F403 +from sift_client.util.test_results import ( + client_has_connection, # noqa: F401 + pytest_addoption, # noqa: F401 + pytest_runtest_makereport, # noqa: F401 +) +from sift_client.util.test_results import ( + module_substep_check_connection as module_substep, # noqa: F401 +) +from sift_client.util.test_results import ( + report_context_check_connection as report_context, # noqa: F401 +) +from sift_client.util.test_results import ( + step_check_connection as step, # noqa: F401 +) From d2f965d9d2ad48ac94993cbdf1d583a49255543c Mon Sep 17 00:00:00 2001 From: Ian Later Date: Mon, 20 Apr 2026 14:48:17 -0700 Subject: [PATCH 7/9] pr fb. pyaddopts rework. logging util file and functions. --- .../low_level_wrappers/_test_results_log.py | 201 ++++++++ .../low_level_wrappers/test_results.py | 442 ++++++++---------- python/lib/sift_client/_tests/conftest.py | 21 +- .../_tests/resources/test_test_results.py | 53 +++ .../lib/sift_client/_tests/util/conftest.py | 2 +- .../scripts/import_test_result_log.py | 8 +- .../sift_client/util/test_results/__init__.py | 9 +- .../util/test_results/context_manager.py | 79 ++-- .../util/test_results/pytest_util.py | 114 ++--- 9 files changed, 583 insertions(+), 346 deletions(-) create mode 100644 python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py diff --git a/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py b/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py new file mode 100644 index 000000000..d9f9dd4df --- /dev/null +++ b/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py @@ -0,0 +1,201 @@ +"""Internal log-format primitives for test-result simulation logs. + +Houses the file-format pieces that used to live inline in ``test_results.py``: + +* Dataclasses describing the log header (``LogTracking``) and the intermediate + state accumulated while replaying a log (``_ReplayState``, ``ReplayResult``). +* Pure functions for writing log entries, rewriting the tracking header, and + parsing data lines. + +This module has no dependency on the low-level gRPC client; the replay +orchestration still lives on ``TestResultsLowLevelClient`` and uses these +helpers. +""" + +from __future__ import annotations + +import fcntl +import json +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any, Generator + +from google.protobuf import json_format + +if TYPE_CHECKING: + from sift_client.sift_types.test_report import TestMeasurement, TestReport, TestStep + + +def _client_version() -> str: + from importlib.metadata import PackageNotFoundError, version + + try: + return version("sift_stack_py") + except PackageNotFoundError: + return "unknown" + + +@dataclass +class LogTracking: + """Tracking metadata stored as line 0 of a log file. + + ``last_uploaded_line`` is the count of data lines (i.e. non-header lines) that + have been successfully uploaded. Each data line corresponds to a single API + call, so line granularity matches the atomic unit of work: a line is either + fully replayed or must be retried in its entirety. Data lines are strictly + append-only, so this counter is stable across header rewrites. + """ + + last_uploaded_line: int = 0 + id_map: dict[str, str] = field(default_factory=dict) + client_version: str = field(default_factory=_client_version) + + def to_log_line(self) -> str: + data = { + "clientVersion": self.client_version, + "lastUploadedLine": self.last_uploaded_line, + "idMap": self.id_map, + } + return f"[LogTracking] {json.dumps(data, separators=(',', ':'))}\n" + + @staticmethod + def from_log_line(line: str) -> LogTracking: + match = re.match(r"^\[LogTracking\]\s*(.+)$", line.strip()) + if not match: + return LogTracking() + data = json.loads(match.group(1)) + return LogTracking( + last_uploaded_line=data.get("lastUploadedLine", 0), + id_map=data.get("idMap", {}), + client_version=data.get("clientVersion", "unknown"), + ) + + +@dataclass +class _ReplayState: + """Mutable state accumulated during log replay.""" + + report: TestReport | None = None + steps_by_id: dict[str, TestStep] = field(default_factory=dict) + steps_order: list[str] = field(default_factory=list) + measurements_by_id: dict[str, TestMeasurement] = field(default_factory=dict) + measurements_order: list[str] = field(default_factory=list) + + +@dataclass +class ReplayResult: + """Result of replaying a log file.""" + + report: TestReport + steps: list[TestStep] = field(default_factory=list) + measurements: list[TestMeasurement] = field(default_factory=list) + + +# Concurrency +# ----------------- +# A test-result log file is written by the test process (:func:`log_request_to_file`) +# while the ``import-test-result-log --incremental`` subprocess concurrently reads it +# (:func:`iter_log_data_lines`) and rewrites its header line (:func:`update_tracking`). +# +# All three functions synchronize via ``fcntl.flock`` on an exclusive file lock + + +def log_request_to_file( + log_file: str | Path, + request_type: str, + request: Any, + response_id: str | None = None, +) -> None: + """Append a request as a JSON-encoded line to ``log_file``. + + Holds ``LOCK_EX`` across the append so the incremental importer's + :func:`update_tracking` rewrite cannot race with it. See the module docstring + above for the full concurrency contract. + + Args: + log_file: Path to the log file. + request_type: Type of request being logged. + request: The protobuf request to log. + response_id: Optional ID from the simulated response, embedded in the tag + for create operations so replay can map previously simulated IDs used + by simulated updates. + """ + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + tag = f"{request_type}:{response_id}" if response_id else request_type + request_dict = json_format.MessageToDict(request) + request_json = json.dumps(request_dict, separators=(",", ":")) + line = f"[{tag}] {request_json}\n" + with open(log_path, "a") as f: + fcntl.flock(f, fcntl.LOCK_EX) + # Closing the file flushes and releases the flock atomically; no explicit + # unlock needed here. + f.write(line) + + +def update_tracking(log_file: str | Path, tracking: LogTracking) -> None: + """Write the LogTracking header as line 0, creating it if missing. + + Holds ``LOCK_EX`` across the entire read-rewrite-truncate cycle so concurrent + :func:`log_request_to_file` appends cannot slip in between ``readlines()`` and + ``truncate()``; re-reads inside the lock so any lines appended since the last + tick are preserved when rewriting. See the module docstring above. + + If the file already has a ``[LogTracking]`` header on line 0, it is replaced + in place. Otherwise the header is inserted as a new line 0 (so we don't + clobber an existing data line). + """ + log_path = Path(log_file) + new_header = tracking.to_log_line() + with open(log_path, "r+") as f: + fcntl.flock(f, fcntl.LOCK_EX) + lines = f.readlines() + if lines and lines[0].startswith("[LogTracking]"): + lines[0] = new_header + else: + lines.insert(0, new_header) + f.seek(0) + f.writelines(lines) + f.truncate() + + +def iter_log_data_lines( + log_path: Path, + start_line: int = 0, +) -> Generator[tuple[str, str | None, str], None, None]: + """Parse data lines from a log file, skipping the LogTracking header. + + Yields ``(request_type, response_id, json_str)`` tuples. Each yielded item + corresponds to one logged API call. + + ``start_line`` is the count of data lines (1-based) already uploaded; the + iterator skips the first ``start_line`` data lines and yields the rest. + Pass 0 to read all data lines. + + Acquires ``LOCK_SH`` only while snapshotting the file into memory, then + releases before yielding so callers can take ``LOCK_EX`` during iteration + (e.g. for :func:`update_tracking`). Any lines appended by a concurrent + :func:`log_request_to_file` call after the snapshot are not visible this + call -- they will be picked up on the next invocation. + """ + line_pattern = re.compile(r"^\[(\w+)(?::([^\]]+))?\]\s*(.+)$") + with open(log_path) as f: + fcntl.flock(f, fcntl.LOCK_SH) + raw_lines = f.readlines() + + data_line_count = 0 + for raw_line in raw_lines: + line = raw_line.strip() + if not line: + continue + match = line_pattern.match(line) + if not match: + raise ValueError(f"Invalid log line: {line}") + request_type = match.group(1) + if request_type == "LogTracking": + continue + data_line_count += 1 + if data_line_count <= start_line: + continue + yield (request_type, match.group(2), match.group(3)) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py index e9d871e4d..345f6b1c2 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py @@ -1,13 +1,9 @@ from __future__ import annotations -import fcntl -import json import logging -import re import uuid -from dataclasses import dataclass, field from pathlib import Path -from typing import TYPE_CHECKING, Any, Generator, cast +from typing import TYPE_CHECKING, Any, cast from google.protobuf import json_format from sift.test_reports.v1.test_reports_pb2 import ( @@ -44,6 +40,14 @@ from sift.test_reports.v1.test_reports_pb2 import TestStep as TestStepProto from sift.test_reports.v1.test_reports_pb2_grpc import TestReportServiceStub +from sift_client._internal.low_level_wrappers._test_results_log import ( + LogTracking, + ReplayResult, + _ReplayState, + iter_log_data_lines, + log_request_to_file, + update_tracking, +) from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase from sift_client.sift_types.test_report import ( TestMeasurement, @@ -367,7 +371,7 @@ async def create_test_report( if log_file is not None or simulate: simulated_proto = self.simulate_create_test_report_response(request) if log_file is not None: - self._log_request_to_file( + log_request_to_file( log_file, "CreateTestReport", request, @@ -490,7 +494,7 @@ async def update_test_report( if log_file is not None or simulate: if log_file is not None: - self._log_request_to_file(log_file, "UpdateTestReport", request) + log_request_to_file(log_file, "UpdateTestReport", request) return self.simulate_update_test_report_response(request, existing=existing) response = await self._grpc_client.get_stub(TestReportServiceStub).UpdateTestReport(request) @@ -540,7 +544,7 @@ async def create_test_step( if log_file is not None or simulate: simulated_proto = self.simulate_create_test_step_response(request) if log_file is not None: - self._log_request_to_file( + log_request_to_file( log_file, "CreateTestStep", request, @@ -646,7 +650,7 @@ async def update_test_step( if log_file is not None or simulate: if log_file is not None: - self._log_request_to_file(log_file, "UpdateTestStep", request) + log_request_to_file(log_file, "UpdateTestStep", request) return self.simulate_update_test_step_response(request, existing=existing) response = await self._grpc_client.get_stub(TestReportServiceStub).UpdateTestStep(request) @@ -696,7 +700,7 @@ async def create_test_measurement( if log_file is not None or simulate: simulated_proto = self.simulate_create_test_measurement_response(request) if log_file is not None: - self._log_request_to_file( + log_request_to_file( log_file, "CreateTestMeasurement", request, @@ -737,7 +741,7 @@ async def create_test_measurements( if log_file is not None or simulate: count, measurement_ids = self.simulate_create_test_measurements_response(request) if log_file is not None: - self._log_request_to_file( + log_request_to_file( log_file, "CreateTestMeasurements", request, @@ -846,7 +850,7 @@ async def update_test_measurement( if log_file is not None or simulate: if log_file is not None: - self._log_request_to_file(log_file, "UpdateTestMeasurement", request) + log_request_to_file(log_file, "UpdateTestMeasurement", request) return self.simulate_update_test_measurement_response(request, existing=existing) response = await self._grpc_client.get_stub(TestReportServiceStub).UpdateTestMeasurement( @@ -884,8 +888,9 @@ async def import_log_file( ``LogTracking`` header on line 0 is ignored. * **incremental** (``incremental=True``): Walk the log line-by-line, issuing the real API call for each entry as it is encountered. - ``LogTracking.last_uploaded_line`` is updated after every successful - call so that a subsequent invocation skips already-uploaded lines. + ``LogTracking.last_uploaded_line`` is advanced only after the call + succeeds, so a failure during a line causes the entire line to be + retried on the next invocation; already-uploaded lines are skipped. Args: log_file: Path to the log file to replay. @@ -924,110 +929,176 @@ async def _import_entry( *id_map* is updated so that subsequent entries can remap IDs that were generated during the original simulation run. """ + handlers = { + "CreateTestReport": self._replay_create_report, + "CreateTestStep": self._replay_create_step, + "CreateTestMeasurement": self._replay_create_measurement, + "CreateTestMeasurements": self._replay_create_measurements, + "UpdateTestReport": self._replay_update_report, + "UpdateTestStep": self._replay_update_step, + "UpdateTestMeasurement": self._replay_update_measurement, + } + handler = handlers.get(request_type) + if handler is None: + return + await handler(json_str, response_id, simulate=simulate, id_map=id_map, state=state) - def _map_id(sid: str) -> str: - return id_map.get(sid, sid) + @staticmethod + def _map_id(id_map: dict[str, str], sid: str) -> str: + """Translate a simulated ID to its real counterpart, or return *sid* unchanged.""" + return id_map.get(sid, sid) - if request_type == "CreateTestReport": - create_report_req = CreateTestReportRequest() - json_format.Parse(json_str, create_report_req) - state.report = await self.create_test_report( - request=create_report_req, simulate=simulate - ) - if response_id: - id_map[response_id] = state.report._id_or_error - - elif request_type == "CreateTestStep": - create_step_req = CreateTestStepRequest() - json_format.Parse(json_str, create_step_req) - create_step_req.test_step.test_report_id = _map_id( - create_step_req.test_step.test_report_id - ) - if create_step_req.test_step.parent_step_id: - create_step_req.test_step.parent_step_id = _map_id( - create_step_req.test_step.parent_step_id - ) - step = await self.create_test_step(request=create_step_req, simulate=simulate) - if response_id: - id_map[response_id] = step._id_or_error - state.steps_by_id[step._id_or_error] = step - state.steps_order.append(step._id_or_error) - - elif request_type == "CreateTestMeasurement": - create_meas_req = CreateTestMeasurementRequest() - json_format.Parse(json_str, create_meas_req) - create_meas_req.test_measurement.test_step_id = _map_id( - create_meas_req.test_measurement.test_step_id - ) - measurement = await self.create_test_measurement( - request=create_meas_req, simulate=simulate - ) - if response_id: - id_map[response_id] = measurement._id_or_error - state.measurements_by_id[measurement._id_or_error] = measurement - state.measurements_order.append(measurement._id_or_error) - - elif request_type == "CreateTestMeasurements": - create_batch_req = CreateTestMeasurementsRequest() - json_format.Parse(json_str, create_batch_req) - for tm in create_batch_req.test_measurements: - tm.test_step_id = _map_id(tm.test_step_id) - original_ids = response_id.split(",") if response_id else [] - if simulate: - for i, tm_proto in enumerate(create_batch_req.test_measurements): - single_req = CreateTestMeasurementRequest(test_measurement=tm_proto) - meas = await self.create_test_measurement(request=single_req, simulate=True) - if i < len(original_ids): - id_map[original_ids[i]] = meas._id_or_error - state.measurements_by_id[meas._id_or_error] = meas - state.measurements_order.append(meas._id_or_error) - else: - _, real_ids = await self.create_test_measurements(request=create_batch_req) - for i, real_id in enumerate(real_ids): - if i < len(original_ids): - id_map[original_ids[i]] = real_id - - elif request_type == "UpdateTestReport": - if state.report is None: - raise ValueError("UpdateTestReport found before CreateTestReport") - update_report_req = UpdateTestReportRequest() - json_format.Parse(json_str, update_report_req) - update_report_req.test_report.test_report_id = _map_id( - update_report_req.test_report.test_report_id - ) - state.report = await self.update_test_report( - request=update_report_req, simulate=simulate, existing=state.report - ) + async def _replay_create_report( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = CreateTestReportRequest() + json_format.Parse(json_str, request) + state.report = await self.create_test_report(request=request, simulate=simulate) + if response_id: + id_map[response_id] = state.report._id_or_error - elif request_type == "UpdateTestStep": - update_step_req = UpdateTestStepRequest() - json_format.Parse(json_str, update_step_req) - orig_step_id = update_step_req.test_step.test_step_id - mapped_step_id = _map_id(orig_step_id) - update_step_req.test_step.test_step_id = mapped_step_id - existing_step = state.steps_by_id.get(mapped_step_id) - if simulate and existing_step is None: - raise ValueError(f"UpdateTestStep for unknown step: {orig_step_id}") - updated_step = await self.update_test_step( - request=update_step_req, simulate=simulate, existing=existing_step - ) - if mapped_step_id in state.steps_by_id: - state.steps_by_id[mapped_step_id] = updated_step - - elif request_type == "UpdateTestMeasurement": - update_meas_req = UpdateTestMeasurementRequest() - json_format.Parse(json_str, update_meas_req) - orig_meas_id = update_meas_req.test_measurement.measurement_id - mapped_meas_id = _map_id(orig_meas_id) - update_meas_req.test_measurement.measurement_id = mapped_meas_id - existing_meas = state.measurements_by_id.get(mapped_meas_id) - if simulate and existing_meas is None: - raise ValueError(f"UpdateTestMeasurement for unknown measurement: {orig_meas_id}") - updated_meas = await self.update_test_measurement( - request=update_meas_req, simulate=simulate, existing=existing_meas + async def _replay_create_step( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = CreateTestStepRequest() + json_format.Parse(json_str, request) + request.test_step.test_report_id = self._map_id(id_map, request.test_step.test_report_id) + if request.test_step.parent_step_id: + request.test_step.parent_step_id = self._map_id( + id_map, request.test_step.parent_step_id ) - if mapped_meas_id in state.measurements_by_id: - state.measurements_by_id[mapped_meas_id] = updated_meas + step = await self.create_test_step(request=request, simulate=simulate) + if response_id: + id_map[response_id] = step._id_or_error + state.steps_by_id[step._id_or_error] = step + state.steps_order.append(step._id_or_error) + + async def _replay_create_measurement( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = CreateTestMeasurementRequest() + json_format.Parse(json_str, request) + request.test_measurement.test_step_id = self._map_id( + id_map, request.test_measurement.test_step_id + ) + measurement = await self.create_test_measurement(request=request, simulate=simulate) + if response_id: + id_map[response_id] = measurement._id_or_error + state.measurements_by_id[measurement._id_or_error] = measurement + state.measurements_order.append(measurement._id_or_error) + + async def _replay_create_measurements( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = CreateTestMeasurementsRequest() + json_format.Parse(json_str, request) + for tm in request.test_measurements: + tm.test_step_id = self._map_id(id_map, tm.test_step_id) + original_ids = response_id.split(",") if response_id else [] + if simulate: + # Batch endpoint has no simulate path; fan out to per-measurement simulate calls. + for i, tm_proto in enumerate(request.test_measurements): + single_req = CreateTestMeasurementRequest(test_measurement=tm_proto) + meas = await self.create_test_measurement(request=single_req, simulate=True) + if i < len(original_ids): + id_map[original_ids[i]] = meas._id_or_error + state.measurements_by_id[meas._id_or_error] = meas + state.measurements_order.append(meas._id_or_error) + else: + _, real_ids = await self.create_test_measurements(request=request) + for i, real_id in enumerate(real_ids): + if i < len(original_ids): + id_map[original_ids[i]] = real_id + + async def _replay_update_report( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + if state.report is None: + raise ValueError("UpdateTestReport found before CreateTestReport") + request = UpdateTestReportRequest() + json_format.Parse(json_str, request) + request.test_report.test_report_id = self._map_id( + id_map, request.test_report.test_report_id + ) + state.report = await self.update_test_report( + request=request, simulate=simulate, existing=state.report + ) + + async def _replay_update_step( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = UpdateTestStepRequest() + json_format.Parse(json_str, request) + orig_step_id = request.test_step.test_step_id + mapped_step_id = self._map_id(id_map, orig_step_id) + request.test_step.test_step_id = mapped_step_id + existing_step = state.steps_by_id.get(mapped_step_id) + if simulate and existing_step is None: + raise ValueError(f"UpdateTestStep for unknown step: {orig_step_id}") + updated_step = await self.update_test_step( + request=request, simulate=simulate, existing=existing_step + ) + if mapped_step_id in state.steps_by_id: + state.steps_by_id[mapped_step_id] = updated_step + + async def _replay_update_measurement( + self, + json_str: str, + response_id: str | None, + *, + simulate: bool, + id_map: dict[str, str], + state: _ReplayState, + ) -> None: + request = UpdateTestMeasurementRequest() + json_format.Parse(json_str, request) + orig_meas_id = request.test_measurement.measurement_id + mapped_meas_id = self._map_id(id_map, orig_meas_id) + request.test_measurement.measurement_id = mapped_meas_id + existing_meas = state.measurements_by_id.get(mapped_meas_id) + if simulate and existing_meas is None: + raise ValueError(f"UpdateTestMeasurement for unknown measurement: {orig_meas_id}") + updated_meas = await self.update_test_measurement( + request=request, simulate=simulate, existing=existing_meas + ) + if mapped_meas_id in state.measurements_by_id: + state.measurements_by_id[mapped_meas_id] = updated_meas # ------------------------------------------------------------------ # Batch replay (default) @@ -1037,7 +1108,7 @@ async def _batch_import_log_file(self, log_path: Path) -> ReplayResult: id_map: dict[str, str] = {} state = _ReplayState() - for request_type, response_id, json_str in self._iter_log_data_lines(log_path): + for request_type, response_id, json_str in iter_log_data_lines(log_path): await self._import_entry( request_type, response_id, @@ -1093,7 +1164,13 @@ async def _batch_import_log_file(self, log_path: Path) -> ReplayResult: # ------------------------------------------------------------------ async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult: - """Replay line-by-line, issuing real API calls and updating tracking.""" + """Replay line-by-line, issuing real API calls and updating tracking. + + Resumes from ``LogTracking.last_uploaded_line`` so already-uploaded + entries are skipped on subsequent ticks rather than re-sent to the server. + Each data line is a single atomic API call; if replay of a line fails, + ``last_uploaded_line`` is not advanced so the whole line is retried next tick. + """ with open(log_path) as f: first_line = f.readline() tracking = LogTracking.from_log_line(first_line) if first_line else LogTracking() @@ -1101,8 +1178,8 @@ async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult: id_map = tracking.id_map state = _ReplayState() - for line_num, (request_type, response_id, json_str) in enumerate( - self._iter_log_data_lines(log_path), start=tracking.last_uploaded_line + 1 + for request_type, response_id, json_str in iter_log_data_lines( + log_path, start_line=tracking.last_uploaded_line ): await self._import_entry( request_type, @@ -1113,8 +1190,8 @@ async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult: state=state, ) - tracking.last_uploaded_line = line_num - self._update_tracking(log_path, tracking) + tracking.last_uploaded_line += 1 + update_tracking(log_path, tracking) if state.report is None: raise ValueError("No CreateTestReport found in log file") @@ -1129,34 +1206,6 @@ async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult: # Log line parsing helpers # ------------------------------------------------------------------ - @staticmethod - def _iter_log_data_lines( - log_path: Path, - ) -> Generator[tuple[str, str | None, str], None, None]: - """Parse data lines from a log file, skipping the LogTracking header. - - Yields (request_type, response_id, json_str) tuples. - The file is read entirely under a shared lock and then released - before yielding, so callers can safely acquire exclusive locks - during iteration (e.g. ``_update_tracking``). - """ - line_pattern = re.compile(r"^\[(\w+)(?::([^\]]+))?\]\s*(.+)$") - with open(log_path) as f: - fcntl.flock(f, fcntl.LOCK_SH) - raw_lines = f.readlines() - - for raw_line in raw_lines: - line = raw_line.strip() - if not line: - continue - match = line_pattern.match(line) - if not match: - raise ValueError(f"Invalid log line: {line}") - request_type = match.group(1) - if request_type == "LogTracking": - continue - yield (request_type, match.group(2), match.group(3)) - async def _create_report_from_simulated(self, simulated: TestReport) -> TestReport: """Create a real test report from a simulated one.""" report_create = TestReportCreate( @@ -1214,96 +1263,9 @@ def _measurement_create_from_simulated( string_expected_value=simulated.string_expected_value, ) - @staticmethod - def _log_request_to_file( - log_file: str | Path, - request_type: str, - request: Any, - response_id: str | None = None, - ) -> None: - """Log a request to a file in JSON format. - - Args: - log_file: Path to the log file. - request_type: Type of request being logged. - request: The protobuf request to log. - response_id: Optional ID from the simulated response, embedded in the tag - for create operations so replay can map previously simulated IDs used by simulated updates. - """ - log_path = Path(log_file) - log_path.parent.mkdir(parents=True, exist_ok=True) - tag = f"{request_type}:{response_id}" if response_id else request_type - with open(log_path, "a") as f: - request_dict = json_format.MessageToDict(request) - request_json = json.dumps(request_dict, separators=(",", ":")) - f.write(f"[{tag}] {request_json}\n") - - @staticmethod - def _update_tracking(log_file: str | Path, tracking: LogTracking) -> None: - """Rewrite the LogTracking header (line 0) in place.""" - log_path = Path(log_file) - with open(log_path, "r+") as f: - fcntl.flock(f, fcntl.LOCK_EX) - lines = f.readlines() - lines[0] = tracking.to_log_line() - f.seek(0) - f.writelines(lines) - f.truncate() - - -def _client_version() -> str: - from importlib.metadata import PackageNotFoundError, version - - try: - return version("sift_stack_py") - except PackageNotFoundError: - return "unknown" - - -@dataclass -class LogTracking: - """Tracking metadata stored as line 0 of a log file.""" - - last_uploaded_line: int = 0 - id_map: dict[str, str] = field(default_factory=dict) - client_version: str = field(default_factory=_client_version) - - def to_log_line(self) -> str: - data = { - "clientVersion": self.client_version, - "lastUploadedLine": self.last_uploaded_line, - "idMap": self.id_map, - } - return f"[LogTracking] {json.dumps(data, separators=(',', ':'))}\n" - - @staticmethod - def from_log_line(line: str) -> LogTracking: - match = re.match(r"^\[LogTracking\]\s*(.+)$", line.strip()) - if not match: - return LogTracking() - data = json.loads(match.group(1)) - return LogTracking( - last_uploaded_line=data.get("lastUploadedLine", 0), - id_map=data.get("idMap", {}), - client_version=data.get("clientVersion", "unknown"), - ) - - -@dataclass -class _ReplayState: - """Mutable state accumulated during log replay.""" - - report: TestReport | None = None - steps_by_id: dict[str, TestStep] = field(default_factory=dict) - steps_order: list[str] = field(default_factory=list) - measurements_by_id: dict[str, TestMeasurement] = field(default_factory=dict) - measurements_order: list[str] = field(default_factory=list) - - -@dataclass -class ReplayResult: - """Result of replaying a log file.""" - report: TestReport - steps: list[TestStep] = field(default_factory=list) - measurements: list[TestMeasurement] = field(default_factory=list) +__all__ = [ + "LogTracking", + "ReplayResult", + "TestResultsLowLevelClient", +] diff --git a/python/lib/sift_client/_tests/conftest.py b/python/lib/sift_client/_tests/conftest.py index cf71728d3..5683182e5 100644 --- a/python/lib/sift_client/_tests/conftest.py +++ b/python/lib/sift_client/_tests/conftest.py @@ -78,17 +78,10 @@ def ci_pytest_tag(sift_client): return tag -from sift_client.util.test_results import ( - client_has_connection, # noqa: F401 - pytest_addoption, # noqa: F401 - pytest_runtest_makereport, # noqa: F401 -) -from sift_client.util.test_results import ( - module_substep_check_connection as module_substep, # noqa: F401 -) -from sift_client.util.test_results import ( - report_context_check_connection as report_context, # noqa: F401 -) -from sift_client.util.test_results import ( - step_check_connection as step, # noqa: F401 -) +# Import the Sift test results fixtures the way we recommend to users. +from sift_client.util.test_results import * # noqa: F403 + + +def pytest_configure(config: pytest.Config) -> None: + """Enable the Sift connection-check mode for the fixtures used in this test suite since we run w/ mock client in non-integration tests.""" + config.option.sift_test_results_check_connection = True diff --git a/python/lib/sift_client/_tests/resources/test_test_results.py b/python/lib/sift_client/_tests/resources/test_test_results.py index 8612665dc..05ef213a9 100644 --- a/python/lib/sift_client/_tests/resources/test_test_results.py +++ b/python/lib/sift_client/_tests/resources/test_test_results.py @@ -717,3 +717,56 @@ async def test_empty_log_file_raises(self, tmp_path): client = TestResultsLowLevelClient(grpc_client=MagicMock()) with pytest.raises(ValueError, match="No CreateTestReport found"): await client.import_log_file(log_file) + + def test_concurrent_append_and_update_tracking_preserves_all_lines(self, tmp_path): + """Writer appends concurrently with updater rewriting the header. + + Regression test for a race where ``log_request_to_file`` (append mode, no + lock) could have its freshly appended line clobbered by + ``update_tracking``'s read-modify-truncate cycle running under ``LOCK_EX`` + on another process/thread. Both sides must take the exclusive lock. + """ + import threading + import time + + from sift.test_reports.v1.test_reports_pb2 import CreateTestReportRequest + + from sift_client._internal.low_level_wrappers._test_results_log import ( + LogTracking, + log_request_to_file, + update_tracking, + ) + + log_file = tmp_path / "race.jsonl" + log_file.touch() + update_tracking(log_file, LogTracking(last_uploaded_line=0)) + + n_appends = 500 + stop = threading.Event() + request = CreateTestReportRequest() + + def writer() -> None: + for i in range(n_appends): + log_request_to_file(log_file, "CreateTestReport", request, response_id=str(i)) + + def updater() -> None: + tracking = LogTracking(last_uploaded_line=0) + while not stop.is_set(): + tracking.last_uploaded_line += 1 + update_tracking(log_file, tracking) + time.sleep(0) + + t_updater = threading.Thread(target=updater) + t_writer = threading.Thread(target=writer) + t_updater.start() + t_writer.start() + t_writer.join() + stop.set() + t_updater.join() + + with open(log_file) as f: + lines = [line for line in f if line.strip()] + data_lines = [line for line in lines if not line.startswith("[LogTracking]")] + assert len(data_lines) == n_appends, ( + f"expected {n_appends} appended data lines, found {len(data_lines)}" + ) diff --git a/python/lib/sift_client/_tests/util/conftest.py b/python/lib/sift_client/_tests/util/conftest.py index c8adf5687..45279cca6 100644 --- a/python/lib/sift_client/_tests/util/conftest.py +++ b/python/lib/sift_client/_tests/util/conftest.py @@ -6,7 +6,7 @@ def pytest_addoption(parser: pytest.Parser) -> None: # Flatten the list of lists into a single list of strings flat_options = [item for sublist in existing_options for item in sublist] if not any("--sift-test-results-log-file" in name for name in flat_options): - parser.addoption("--sift-test-results-log-file", action="store_true", default="false") + parser.addoption("--sift-test-results-log-file", action="store_true", default=False) def pytest_configure(config: pytest.Config) -> None: diff --git a/python/lib/sift_client/scripts/import_test_result_log.py b/python/lib/sift_client/scripts/import_test_result_log.py index 78bf6e832..7e14e4d59 100644 --- a/python/lib/sift_client/scripts/import_test_result_log.py +++ b/python/lib/sift_client/scripts/import_test_result_log.py @@ -11,6 +11,7 @@ from typing import TYPE_CHECKING from sift_client import SiftClient, SiftConnectionConfig +from sift_client.util.test_results.context_manager import log_replay_instructions if TYPE_CHECKING: from sift_client._internal.low_level_wrappers.test_results import ReplayResult @@ -79,14 +80,9 @@ def main() -> None: fp = os.path.abspath(args.log_file) if fp.startswith(tempfile.gettempdir()): os.remove(fp) - if result: - _print_result(result) except Exception as e: logger.error(e) - logger.error( - f"Error replaying log file: {args.log_file}.\n" - f" Can replay with `replay-test-result-log {args.log_file}`." - ) + log_replay_instructions(args.log_file) raise if result: diff --git a/python/lib/sift_client/util/test_results/__init__.py b/python/lib/sift_client/util/test_results/__init__.py index 839de0617..e7a82866c 100644 --- a/python/lib/sift_client/util/test_results/__init__.py +++ b/python/lib/sift_client/util/test_results/__init__.py @@ -64,8 +64,9 @@ def main(self): Import the `pytest_addoption` function to add configuration options for Test Results to the commandline or add the options to your pyproject.toml file (https://docs.pytest.org/en/stable/reference/customize.html#configuration). If ommitted, will use the default values described below. -- Git metadata: Include git metadata(repo, branch, commit) in the test results. Default is False. You can configure the test results by passing the `--sift-test-results-git-metadata` . +- Git metadata: Include git metadata (repo, branch, commit) in the test results. Default is True. You can disable it by passing `--no-sift-test-results-git-metadata`. - Log file: Write test results to a file. This happens automatically but you can configure specify a specific log file by passing `--sift-test-results-log-file=` or disable logging by passing `--sift-test-results-log-file=false`. +- Check connection: Pass `--sift-test-results-check-connection` (off by default) to make the `report_context`, `step`, and `module_substep` fixtures no-op when the Sift client has no connection to the server. Requires a `client_has_connection` fixture to be available. ###### Example at top of your test file or in your conftest.py file: @@ -107,13 +108,10 @@ def test_example(report_context, step): from .pytest_util import ( client_has_connection, module_substep, - module_substep_check_connection, pytest_addoption, pytest_runtest_makereport, report_context, - report_context_check_connection, step, - step_check_connection, ) __all__ = [ @@ -121,11 +119,8 @@ def test_example(report_context, step): "ReportContext", "client_has_connection", "module_substep", - "module_substep_check_connection", "pytest_addoption", "pytest_runtest_makereport", "report_context", - "report_context_check_connection", "step", - "step_check_connection", ] diff --git a/python/lib/sift_client/util/test_results/context_manager.py b/python/lib/sift_client/util/test_results/context_manager.py index d7439e03f..b8edc6800 100644 --- a/python/lib/sift_client/util/test_results/context_manager.py +++ b/python/lib/sift_client/util/test_results/context_manager.py @@ -7,7 +7,7 @@ import subprocess import tempfile import traceback -from contextlib import AbstractContextManager +from contextlib import AbstractContextManager, contextmanager from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING @@ -38,24 +38,58 @@ logger = logging.getLogger(__name__) +def log_replay_instructions(log_file: str | Path | None) -> None: + """Log instructions for manually replaying a test result log file. + + Used when an import/replay attempt fails so the user can retry against the same file. + """ + if log_file is None: + return + logger.error( + f"Error replaying log file: {log_file}.\n" + f" Can replay with `replay-test-result-log {log_file}`." + ) + + +@contextmanager +def _quiet_fork_stderr(): + """Redirect fd 2 to /dev/null across a ``fork()`` to discard gRPC's prefork notices. + + Redirecting fd 2 at the fd level (``os.dup2``) is what gRPC's handlers actually + write to, so wrapping a fork-site in this context manager reliably swallows those + notices without touching gRPC's global state. Scope the ``with`` block as tightly + as possible since it affects every thread in the process while active. + """ + saved_fd = os.dup(2) + devnull_fd = os.open(os.devnull, os.O_WRONLY) + try: + os.dup2(devnull_fd, 2) + os.close(devnull_fd) + yield + finally: + os.dup2(saved_fd, 2) + os.close(saved_fd) + + def _git_metadata() -> dict[str, str] | None: """Return git branch and commit hash, or None if not in a git repo.""" try: - branch = subprocess.check_output( - ["git", "rev-parse", "--abbrev-ref", "HEAD"], - stderr=subprocess.DEVNULL, - text=True, - ).strip() - commit = subprocess.check_output( - ["git", "rev-parse", "--short", "HEAD"], - stderr=subprocess.DEVNULL, - text=True, - ).strip() - repo = subprocess.check_output( - ["git", "remote", "get-url", "origin"], - stderr=subprocess.DEVNULL, - text=True, - ).strip() + with _quiet_fork_stderr(): + branch = subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + commit = subprocess.check_output( + ["git", "describe", "--always", "--dirty", "--exclude", "*"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + repo = subprocess.check_output( + ["git", "remote", "get-url", "origin"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() return {"git_repo": repo, "git_branch": branch, "git_commit": commit} except Exception: return None @@ -130,21 +164,13 @@ def __init__( def _open_import_proc(self): """Open a subprocess to import the log file.""" - # To avoid GRPC forking errors, temporarily redirect stderr at the fd level before forking, so the child inherits /dev/null on fd 2 when the atfork handler fires. - saved_stderr = os.dup(2) - devnull_fd = os.open(os.devnull, os.O_WRONLY) - os.dup2(devnull_fd, 2) - os.close(devnull_fd) - try: + with _quiet_fork_stderr(): self._import_proc = subprocess.Popen( ["import-test-result-log", "--incremental", str(self.log_file)], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) - finally: - os.dup2(saved_stderr, 2) - os.close(saved_stderr) def __enter__(self): if self.log_file: @@ -163,11 +189,12 @@ def __exit__(self, exc_type, exc_value, traceback): if self._import_proc is not None: try: - self._import_proc.communicate(timeout=10) + self._import_proc.communicate(timeout=1) except subprocess.TimeoutExpired: logger.error("Import process did not exit in 10s, killing it") self._import_proc.kill() self._import_proc.wait() + log_replay_instructions(self.log_file) raise return True diff --git a/python/lib/sift_client/util/test_results/pytest_util.py b/python/lib/sift_client/util/test_results/pytest_util.py index 645b6c047..ae22fa122 100644 --- a/python/lib/sift_client/util/test_results/pytest_util.py +++ b/python/lib/sift_client/util/test_results/pytest_util.py @@ -27,10 +27,20 @@ def pytest_addoption(parser: pytest.Parser) -> None: "or a file path to write to a specific location.", ) parser.addoption( - "--sift-test-results-git-metadata", - action="store_true", + "--no-sift-test-results-git-metadata", + action="store_false", + dest="sift_test_results_git_metadata", default=True, - help="Include git metadata in the Sift test results.", + help="Exclude git metadata from the Sift test results. " + "Git metadata (repo, branch, commit) is included by default.", + ) + parser.addoption( + "--sift-test-results-check-connection", + action="store_true", + default=False, + help="Skip the sift test-result fixtures (report_context, step, module_substep) " + "when the Sift client has no connection to the server. Requires a " + "`client_has_connection` fixture to be available in the test session.", ) @@ -78,9 +88,9 @@ def _report_context_impl( test_case = test_path if test_path.exists() else base_name log_file = _resolve_log_file(pytestconfig) include_git_metadata = ( - bool(pytestconfig.getoption("--sift-test-results-git-metadata", default=False)) + bool(pytestconfig.getoption("sift_test_results_git_metadata", default=True)) if pytestconfig - else False + else True ) with ReportContext( sift_client, @@ -95,6 +105,18 @@ def _report_context_impl( yield context +def _check_connection_enabled(pytestconfig: pytest.Config | None) -> bool: + """Return True when the caller opted into `--sift-test-results-check-connection`.""" + if pytestconfig is None: + return False + return bool(pytestconfig.getoption("sift_test_results_check_connection", default=False)) + + +def _has_sift_connection(request: pytest.FixtureRequest) -> bool: + """Resolve the `client_has_connection` fixture lazily; only called when the check is enabled.""" + return bool(request.getfixturevalue("client_has_connection")) + + @pytest.fixture(scope="session", autouse=True) def report_context( sift_client: SiftClient, request: pytest.FixtureRequest, pytestconfig: pytest.Config @@ -103,7 +125,14 @@ def report_context( The log file destination is controlled by ``--sift-test-results-log-file``. Defaults to a temp file when not set. + + When ``--sift-test-results-check-connection`` is passed, this fixture will no-op + (yield None) if the Sift client has no connection to the server. That mode + requires a ``client_has_connection`` fixture to be available in the session. """ + if _check_connection_enabled(pytestconfig) and not _has_sift_connection(request): + yield None + return yield from _report_context_impl(sift_client, request, pytestconfig=pytestconfig) @@ -126,17 +155,39 @@ def _step_impl( @pytest.fixture(autouse=True) def step( - report_context: ReportContext, request: pytest.FixtureRequest + report_context: ReportContext | None, + request: pytest.FixtureRequest, + pytestconfig: pytest.Config, ) -> Generator[NewStep | None, None, None]: - """Create an outer step for the function.""" + """Create an outer step for the function. + + No-ops when ``--sift-test-results-check-connection`` is set and the client + has no connection (or when the session-scoped ``report_context`` resolved to None). + """ + if report_context is None or ( + _check_connection_enabled(pytestconfig) and not _has_sift_connection(request) + ): + yield None + return yield from _step_impl(report_context, request) @pytest.fixture(scope="module", autouse=True) def module_substep( - report_context: ReportContext, request: pytest.FixtureRequest + report_context: ReportContext | None, + request: pytest.FixtureRequest, + pytestconfig: pytest.Config, ) -> Generator[NewStep | None, None, None]: - """Create a step per module.""" + """Create a step per module. + + No-ops when ``--sift-test-results-check-connection`` is set and the client + has no connection (or when the session-scoped ``report_context`` resolved to None). + """ + if report_context is None or ( + _check_connection_enabled(pytestconfig) and not _has_sift_connection(request) + ): + yield None + return yield from _step_impl(report_context, request) @@ -144,7 +195,8 @@ def module_substep( def client_has_connection(sift_client): """Check if the SiftClient has a connection to the Sift server. - Can be used to skip tests that require a connection to the Sift server. + Can be used to skip tests that require a connection to the Sift server, and is + consulted by the Sift fixtures when ``--sift-test-results-check-connection`` is set. """ has_connection = False try: @@ -153,45 +205,3 @@ def client_has_connection(sift_client): except Exception: has_connection = False return has_connection - - -######################################################## -# The following fixtures will conditionally create a report if the client has a connection to the Sift server. -# If you want to use these, you must also import or implement the client_has_connection fixture. -######################################################## - - -@pytest.fixture(scope="session", autouse=True) -def report_context_check_connection( - sift_client: SiftClient, - client_has_connection: bool, - request: pytest.FixtureRequest, - pytestconfig: pytest.Config, -) -> Generator[ReportContext | None, None, None]: - """Create a report context for the session. Doesn't run if the client has no connection to the Sift server.""" - if client_has_connection: - yield from _report_context_impl(sift_client, request, pytestconfig=pytestconfig) - else: - yield None - - -@pytest.fixture(autouse=True) -def step_check_connection( - report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[NewStep | None, None, None]: - """Create an outer step for the function. Doesn't run if the client has no connection to the Sift server.""" - if client_has_connection: - yield from _step_impl(report_context, request) - else: - yield None - - -@pytest.fixture(scope="module", autouse=True) -def module_substep_check_connection( - report_context: ReportContext, client_has_connection: bool, request: pytest.FixtureRequest -) -> Generator[NewStep | None, None, None]: - """Create a step per module. Doesn't run if the client has no connection to the Sift server.""" - if client_has_connection: - yield from _step_impl(report_context, request) - else: - yield None From ae60ec431ee2e8da3e9d6f1704b2355dede5c22b Mon Sep 17 00:00:00 2001 From: Ian Later Date: Mon, 20 Apr 2026 15:40:32 -0700 Subject: [PATCH 8/9] pass current sift_client credentials to subproc --- .../sift_client/util/test_results/context_manager.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/python/lib/sift_client/util/test_results/context_manager.py b/python/lib/sift_client/util/test_results/context_manager.py index b8edc6800..d890e52ea 100644 --- a/python/lib/sift_client/util/test_results/context_manager.py +++ b/python/lib/sift_client/util/test_results/context_manager.py @@ -166,7 +166,17 @@ def _open_import_proc(self): """Open a subprocess to import the log file.""" with _quiet_fork_stderr(): self._import_proc = subprocess.Popen( - ["import-test-result-log", "--incremental", str(self.log_file)], + [ + "import-test-result-log", + "--incremental", + str(self.log_file), + "--grpc-url", + self.client.grpc_client._config.url, + "--rest-url", + self.client.rest_client._config.url, + "--api-key", + self.client.grpc_client._config.api_key, + ], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, From 280ad137414524bd0fb61a373ed3a7e7d9126b5f Mon Sep 17 00:00:00 2001 From: Ian Later Date: Wed, 22 Apr 2026 14:27:55 -0700 Subject: [PATCH 9/9] move log tracking to a sidecar file --- .../low_level_wrappers/_test_results_log.py | 174 ++++++++++-------- .../low_level_wrappers/test_results.py | 18 +- .../_tests/resources/test_test_results.py | 36 ++-- .../resources/sync_stubs/__init__.pyi | 2 +- .../lib/sift_client/resources/test_results.py | 2 +- .../util/test_results/context_manager.py | 4 +- 6 files changed, 124 insertions(+), 112 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py b/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py index d9f9dd4df..4f41d7989 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/_test_results_log.py @@ -1,21 +1,42 @@ """Internal log-format primitives for test-result simulation logs. -Houses the file-format pieces that used to live inline in ``test_results.py``: +Two files per run: + +* **Log file** (e.g. ``foo.jsonl``) - append-only record of each logged API call, + one line per call. Written by :func:`log_request_to_file` in the test process + and read by :func:`iter_log_data_lines` / the replay subprocess. Has no header: + every line is a data line. +* **Tracking sidecar** (``foo.jsonl.tracking``) - small JSON file holding the + incremental replay cursor (``lastUploadedLine``) and the simulated-to-real ID + map. Written only by the replay subprocess via :meth:`LogTracking.save` using + a temp-file + ``os.replace`` so a crash can't leave a half-written sidecar. + Read once at replay start via :meth:`LogTracking.load`. Never touched by the + test process. -* Dataclasses describing the log header (``LogTracking``) and the intermediate - state accumulated while replaying a log (``_ReplayState``, ``ReplayResult``). -* Pure functions for writing log entries, rewriting the tracking header, and - parsing data lines. +# Concurrency -This module has no dependency on the low-level gRPC client; the replay -orchestration still lives on ``TestResultsLowLevelClient`` and uses these -helpers. +With tracking moved out of the main log, the log file becomes strictly +append-only and has exactly one in-place mutator (the writer) and one scanner +(the replay subprocess). POSIX guarantees that an ``O_APPEND`` write atomically +bumps the EOF, so parallel writers can't lose data. To keep a concurrent reader +from observing a mid-append partial final line we still take ``LOCK_EX`` on the +writer's single append and ``LOCK_SH`` on the reader's ``readlines()``; there +is never any exclusive-vs-exclusive contention because nothing rewrites the +file any more. + +The sidecar has a single writer (the replay subprocess) and no live reader, so +it needs no locking. Atomic rename is still used to keep the on-disk contents +valid across crashes. + +``flock`` is advisory, so this contract only holds for processes that use these +helpers; ad-hoc writers are not protected. """ from __future__ import annotations import fcntl import json +import os import re from dataclasses import dataclass, field from pathlib import Path @@ -38,39 +59,71 @@ def _client_version() -> str: @dataclass class LogTracking: - """Tracking metadata stored as line 0 of a log file. - - ``last_uploaded_line`` is the count of data lines (i.e. non-header lines) that - have been successfully uploaded. Each data line corresponds to a single API - call, so line granularity matches the atomic unit of work: a line is either - fully replayed or must be retried in its entirety. Data lines are strictly - append-only, so this counter is stable across header rewrites. + """Incremental-replay cursor and simulated-to-real ID map. + + Persisted beside the log file (see module docstring for layout). The log + file itself is append-only and stores only API-call data lines. + + * ``last_uploaded_line`` is the count of data lines that have been + successfully replayed against the server. Each data line corresponds to a + single API call, so line granularity matches the atomic unit of work: a + line is either fully replayed or must be retried in its entirety. Data + lines are strictly append-only, so this counter is stable across runs. + * ``id_map`` maps simulated response IDs (created during the original test + run) to the real IDs assigned by the server during replay. Subsequent + ``Update*`` entries consult this map to translate IDs. """ last_uploaded_line: int = 0 id_map: dict[str, str] = field(default_factory=dict) client_version: str = field(default_factory=_client_version) - def to_log_line(self) -> str: - data = { - "clientVersion": self.client_version, - "lastUploadedLine": self.last_uploaded_line, - "idMap": self.id_map, - } - return f"[LogTracking] {json.dumps(data, separators=(',', ':'))}\n" - @staticmethod - def from_log_line(line: str) -> LogTracking: - match = re.match(r"^\[LogTracking\]\s*(.+)$", line.strip()) - if not match: - return LogTracking() - data = json.loads(match.group(1)) - return LogTracking( + def sidecar_path(log_path: str | Path) -> Path: + """Return the sidecar path for a given log file (``.tracking``).""" + p = Path(log_path) + return p.with_name(p.name + ".tracking") + + @classmethod + def load(cls, log_path: str | Path) -> LogTracking: + """Read tracking state for ``log_path``; return a fresh instance if missing or corrupt. + + A missing sidecar is the normal state before the first incremental tick. + A malformed sidecar is treated the same so a crash mid-write can't brick + replay; the worst case is a re-replay of already-uploaded lines, which + the server must be prepared for anyway. + """ + sidecar = cls.sidecar_path(log_path) + try: + data = json.loads(sidecar.read_text()) + except (FileNotFoundError, json.JSONDecodeError, OSError): + return cls() + return cls( last_uploaded_line=data.get("lastUploadedLine", 0), id_map=data.get("idMap", {}), client_version=data.get("clientVersion", "unknown"), ) + def save(self, log_path: str | Path) -> None: + """Atomically write tracking state to the sidecar for ``log_path``. + + Uses temp-file + ``os.replace`` so readers (and crash recovery) never + observe a partially written sidecar. + """ + sidecar = self.sidecar_path(log_path) + sidecar.parent.mkdir(parents=True, exist_ok=True) + payload = json.dumps( + { + "clientVersion": self.client_version, + "lastUploadedLine": self.last_uploaded_line, + "idMap": self.id_map, + }, + separators=(",", ":"), + ) + tmp = sidecar.with_name(sidecar.name + ".tmp") + tmp.write_text(payload) + os.replace(tmp, sidecar) + @dataclass class _ReplayState: @@ -92,15 +145,6 @@ class ReplayResult: measurements: list[TestMeasurement] = field(default_factory=list) -# Concurrency -# ----------------- -# A test-result log file is written by the test process (:func:`log_request_to_file`) -# while the ``import-test-result-log --incremental`` subprocess concurrently reads it -# (:func:`iter_log_data_lines`) and rewrites its header line (:func:`update_tracking`). -# -# All three functions synchronize via ``fcntl.flock`` on an exclusive file lock - - def log_request_to_file( log_file: str | Path, request_type: str, @@ -109,9 +153,9 @@ def log_request_to_file( ) -> None: """Append a request as a JSON-encoded line to ``log_file``. - Holds ``LOCK_EX`` across the append so the incremental importer's - :func:`update_tracking` rewrite cannot race with it. See the module docstring - above for the full concurrency contract. + Takes ``LOCK_EX`` across the append so a concurrent reader holding + ``LOCK_SH`` in :func:`iter_log_data_lines` can't see a mid-write partial + final line. See the module docstring for the full concurrency model. Args: log_file: Path to the log file. @@ -129,55 +173,28 @@ def log_request_to_file( line = f"[{tag}] {request_json}\n" with open(log_path, "a") as f: fcntl.flock(f, fcntl.LOCK_EX) - # Closing the file flushes and releases the flock atomically; no explicit - # unlock needed here. + # Closing the file flushes and releases the flock atomically; no + # explicit unlock needed here. f.write(line) -def update_tracking(log_file: str | Path, tracking: LogTracking) -> None: - """Write the LogTracking header as line 0, creating it if missing. - - Holds ``LOCK_EX`` across the entire read-rewrite-truncate cycle so concurrent - :func:`log_request_to_file` appends cannot slip in between ``readlines()`` and - ``truncate()``; re-reads inside the lock so any lines appended since the last - tick are preserved when rewriting. See the module docstring above. - - If the file already has a ``[LogTracking]`` header on line 0, it is replaced - in place. Otherwise the header is inserted as a new line 0 (so we don't - clobber an existing data line). - """ - log_path = Path(log_file) - new_header = tracking.to_log_line() - with open(log_path, "r+") as f: - fcntl.flock(f, fcntl.LOCK_EX) - lines = f.readlines() - if lines and lines[0].startswith("[LogTracking]"): - lines[0] = new_header - else: - lines.insert(0, new_header) - f.seek(0) - f.writelines(lines) - f.truncate() - - def iter_log_data_lines( log_path: Path, start_line: int = 0, ) -> Generator[tuple[str, str | None, str], None, None]: - """Parse data lines from a log file, skipping the LogTracking header. + """Parse data lines from a log file. Yields ``(request_type, response_id, json_str)`` tuples. Each yielded item corresponds to one logged API call. ``start_line`` is the count of data lines (1-based) already uploaded; the - iterator skips the first ``start_line`` data lines and yields the rest. - Pass 0 to read all data lines. + iterator skips the first ``start_line`` lines and yields the rest. Pass 0 + to read all data lines. Acquires ``LOCK_SH`` only while snapshotting the file into memory, then - releases before yielding so callers can take ``LOCK_EX`` during iteration - (e.g. for :func:`update_tracking`). Any lines appended by a concurrent - :func:`log_request_to_file` call after the snapshot are not visible this - call -- they will be picked up on the next invocation. + releases before yielding. Lines appended by a concurrent + :func:`log_request_to_file` after the snapshot are not visible this call -- + they will be picked up on the next invocation. """ line_pattern = re.compile(r"^\[(\w+)(?::([^\]]+))?\]\s*(.+)$") with open(log_path) as f: @@ -192,10 +209,7 @@ def iter_log_data_lines( match = line_pattern.match(line) if not match: raise ValueError(f"Invalid log line: {line}") - request_type = match.group(1) - if request_type == "LogTracking": - continue data_line_count += 1 if data_line_count <= start_line: continue - yield (request_type, match.group(2), match.group(3)) + yield (match.group(1), match.group(2), match.group(3)) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py index 345f6b1c2..ac4896190 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py @@ -46,7 +46,6 @@ _ReplayState, iter_log_data_lines, log_request_to_file, - update_tracking, ) from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase from sift_client.sift_types.test_report import ( @@ -1166,15 +1165,14 @@ async def _batch_import_log_file(self, log_path: Path) -> ReplayResult: async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult: """Replay line-by-line, issuing real API calls and updating tracking. - Resumes from ``LogTracking.last_uploaded_line`` so already-uploaded - entries are skipped on subsequent ticks rather than re-sent to the server. - Each data line is a single atomic API call; if replay of a line fails, - ``last_uploaded_line`` is not advanced so the whole line is retried next tick. + Resumes from ``LogTracking.last_uploaded_line`` (loaded from the + ``.tracking`` sidecar) so already-uploaded entries are skipped on + subsequent ticks rather than re-sent to the server. Each data line is a + single atomic API call; if replay of a line fails, + ``last_uploaded_line`` is not advanced so the whole line is retried + next tick. """ - with open(log_path) as f: - first_line = f.readline() - tracking = LogTracking.from_log_line(first_line) if first_line else LogTracking() - + tracking = LogTracking.load(log_path) id_map = tracking.id_map state = _ReplayState() @@ -1191,7 +1189,7 @@ async def _incremental_import_log_file(self, log_path: Path) -> ReplayResult: ) tracking.last_uploaded_line += 1 - update_tracking(log_path, tracking) + tracking.save(log_path) if state.report is None: raise ValueError("No CreateTestReport found in log file") diff --git a/python/lib/sift_client/_tests/resources/test_test_results.py b/python/lib/sift_client/_tests/resources/test_test_results.py index 05ef213a9..60941bfb9 100644 --- a/python/lib/sift_client/_tests/resources/test_test_results.py +++ b/python/lib/sift_client/_tests/resources/test_test_results.py @@ -698,11 +698,9 @@ def test_import_log_file_round_trip(self, sift_client, nostromo_run, tmp_path): @pytest.mark.asyncio async def test_malformed_log_line_skipped(self, tmp_path): - """Malformed lines are skipped; a file with no valid entries raises 'No CreateTestReport'.""" + """Malformed lines raise a ValueError during iteration.""" log_file = tmp_path / "bad.jsonl" - log_file.write_text( - '[LogTracking] {"lastUploadedLine":0,"idMap":{}}\nthis is not a valid log line\n' - ) + log_file.write_text("this is not a valid log line\n") client = TestResultsLowLevelClient(grpc_client=MagicMock()) with pytest.raises(ValueError, match="Invalid log line: this is not a valid log lin"): @@ -710,21 +708,22 @@ async def test_malformed_log_line_skipped(self, tmp_path): @pytest.mark.asyncio async def test_empty_log_file_raises(self, tmp_path): - """A log file with only a LogTracking header and no entries raises.""" + """A log file with no entries raises 'No CreateTestReport'.""" log_file = tmp_path / "empty.jsonl" - log_file.write_text('[LogTracking] {"lastUploadedLine":0,"idMap":{}}\n') + log_file.touch() client = TestResultsLowLevelClient(grpc_client=MagicMock()) with pytest.raises(ValueError, match="No CreateTestReport found"): await client.import_log_file(log_file) - def test_concurrent_append_and_update_tracking_preserves_all_lines(self, tmp_path): - """Writer appends concurrently with updater rewriting the header. + def test_concurrent_append_and_tracking_save_preserves_all_lines(self, tmp_path): + """Writer appending to the log shares no mutation point with the tracking sidecar. - Regression test for a race where ``log_request_to_file`` (append mode, no - lock) could have its freshly appended line clobbered by - ``update_tracking``'s read-modify-truncate cycle running under ``LOCK_EX`` - on another process/thread. Both sides must take the exclusive lock. + With tracking moved out of the main log into ``.tracking``, the log + file is strictly append-only -- there is no path by which concurrent + ``LogTracking.save`` calls can clobber appended lines. This test pins + that invariant: 500 writer appends run alongside a hot updater looping + on sidecar writes, and every append must survive. """ import threading import time @@ -734,12 +733,9 @@ def test_concurrent_append_and_update_tracking_preserves_all_lines(self, tmp_pat from sift_client._internal.low_level_wrappers._test_results_log import ( LogTracking, log_request_to_file, - update_tracking, ) log_file = tmp_path / "race.jsonl" - log_file.touch() - update_tracking(log_file, LogTracking(last_uploaded_line=0)) n_appends = 500 stop = threading.Event() @@ -753,7 +749,7 @@ def updater() -> None: tracking = LogTracking(last_uploaded_line=0) while not stop.is_set(): tracking.last_uploaded_line += 1 - update_tracking(log_file, tracking) + tracking.save(log_file) time.sleep(0) t_updater = threading.Thread(target=updater) @@ -765,8 +761,12 @@ def updater() -> None: t_updater.join() with open(log_file) as f: - lines = [line for line in f if line.strip()] - data_lines = [line for line in lines if not line.startswith("[LogTracking]")] + data_lines = [line for line in f if line.strip()] assert len(data_lines) == n_appends, ( f"expected {n_appends} appended data lines, found {len(data_lines)}" ) + + sidecar = LogTracking.sidecar_path(log_file) + assert sidecar.exists() + reloaded = LogTracking.load(log_file) + assert reloaded.last_uploaded_line >= 1 diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 241047cfa..de0107068 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -2010,7 +2010,7 @@ class TestResultsAPI: Args: log_file: Path to the log file to import. - incremental: (internal tooling) If True, goes line by line and calls every event vs. reading the entire file at once and sending resultant test report. + incremental: (internal tooling) If True, goes line by line and calls API every event -- keeps track of last line sent so it can be called after some updates and be additive vs. replaying the entire log file each time(i.e. when False, reads the entire log file, building a test report in memory, then sends the calls for each step/measurement to the API). Returns: A ReplayResult containing the created report, steps, and measurements. diff --git a/python/lib/sift_client/resources/test_results.py b/python/lib/sift_client/resources/test_results.py index 927ab27a2..a6ffec2e0 100644 --- a/python/lib/sift_client/resources/test_results.py +++ b/python/lib/sift_client/resources/test_results.py @@ -629,7 +629,7 @@ async def import_log_file( Args: log_file: Path to the log file to import. - incremental: (internal tooling) If True, goes line by line and calls every event vs. reading the entire file at once and sending resultant test report. + incremental: (internal tooling) If True, goes line by line and calls API every event -- keeps track of last line sent so it can be called after some updates and be additive vs. replaying the entire log file each time(i.e. when False, reads the entire log file, building a test report in memory, then sends the calls for each step/measurement to the API). Returns: A ReplayResult containing the created report, steps, and measurements. diff --git a/python/lib/sift_client/util/test_results/context_manager.py b/python/lib/sift_client/util/test_results/context_manager.py index d890e52ea..4c179c060 100644 --- a/python/lib/sift_client/util/test_results/context_manager.py +++ b/python/lib/sift_client/util/test_results/context_manager.py @@ -171,9 +171,9 @@ def _open_import_proc(self): "--incremental", str(self.log_file), "--grpc-url", - self.client.grpc_client._config.url, + self.client.grpc_client._config.uri, "--rest-url", - self.client.rest_client._config.url, + self.client.rest_client._config.base_url, "--api-key", self.client.grpc_client._config.api_key, ],