diff --git a/src/aieng_bot/observability/activity_logger.py b/src/aieng_bot/observability/activity_logger.py index 42c81a4..6908ec9 100644 --- a/src/aieng_bot/observability/activity_logger.py +++ b/src/aieng_bot/observability/activity_logger.py @@ -15,22 +15,33 @@ class ActivityLogger: """Logger for bot fix and merge activities. - Records all bot activity to a unified log file in GCS for dashboard consumption. - Every invocation of the fix command logs an activity here. + Each fix is written to a unique per-run sidecar file in GCS so that + concurrent workflow jobs never overwrite each other's entries. After writing + the sidecar, the combined ``bot_activity_log.json`` is rebuilt from all + sidecars plus any historical entries in the existing log. Parameters ---------- bucket : str, optional GCS bucket name (default="bot-dashboard-vectorinstitute"). log_path : str, optional - Path to activity log in GCS (default="data/bot_activity_log.json"). + Path to combined activity log in GCS (default="data/bot_activity_log.json"). + entries_prefix : str, optional + GCS key prefix for individual activity entry files + (default="data/activity_entries/"). Attributes ---------- bucket : str GCS bucket name. log_path : str - Path to activity log in GCS. + Path to combined activity log in GCS. + gcs_uri : str + Full GCS URI for combined activity log. + entries_prefix : str + GCS key prefix for individual activity entry files. + entries_gcs_prefix : str + Full GCS URI prefix for individual activity entry files. """ @@ -38,6 +49,7 @@ def __init__( self, bucket: str = "bot-dashboard-vectorinstitute", log_path: str = "data/bot_activity_log.json", + entries_prefix: str = "data/activity_entries/", ): """Initialize activity logger. @@ -46,57 +58,93 @@ def __init__( bucket : str, optional GCS bucket name (default="bot-dashboard-vectorinstitute"). log_path : str, optional - Path to activity log in GCS (default="data/bot_activity_log.json"). + Path to combined activity log in GCS + (default="data/bot_activity_log.json"). + entries_prefix : str, optional + GCS key prefix for individual activity entry files + (default="data/activity_entries/"). """ self.bucket = bucket self.log_path = log_path self.gcs_uri = f"gs://{bucket}/{log_path}" + self.entries_prefix = entries_prefix + self.entries_gcs_prefix = f"gs://{bucket}/{entries_prefix}" - def _load_activity_log(self) -> dict | None: # noqa: PYI041 - """Load existing activity log from GCS. + def _gcs_upload(self, local_path: str, gcs_uri: str) -> bool: + """Upload a local file to GCS. + + Parameters + ---------- + local_path : str + Local file path to upload. + gcs_uri : str + Destination GCS URI. + + Returns + ------- + bool + True on success, False on failure. + + """ + try: + subprocess.run( + ["gcloud", "storage", "cp", local_path, gcs_uri], + check=True, + capture_output=True, + ) + return True + except subprocess.CalledProcessError as e: + log_error(f"Failed to upload to {gcs_uri}: {e.stderr or e.stdout}") + return False + + def _gcs_read_json(self, gcs_uri: str) -> dict | None: # noqa: PYI041 + """Read and parse a JSON file from GCS. + + Parameters + ---------- + gcs_uri : str + GCS URI to read. Returns ------- dict | None - Activity log with 'activities' list and 'last_updated' timestamp. - Returns empty structure if file doesn't exist (404). - Returns None if load failed for any other reason (caller must not save). + Parsed JSON dict, empty structure if file does not exist (404), + or None on any other error (caller must not overwrite). """ try: result = subprocess.run( - ["gcloud", "storage", "cat", self.gcs_uri], + ["gcloud", "storage", "cat", gcs_uri], capture_output=True, text=True, check=True, ) return json.loads(result.stdout) except subprocess.CalledProcessError as e: - # Distinguish "not found" from auth/network failures. - # gcloud storage cat prints "No URLs matched" or "NotFound" for missing objects. stderr = e.stderr or "" stdout = e.stdout or "" not_found_signals = ("No URLs matched", "NotFound", "does not exist", "404") if any(s in stderr or s in stdout for s in not_found_signals): - # File doesn't exist yet — safe to create return {"activities": [], "last_updated": None} - # Any other failure (auth, network, permissions) — do NOT overwrite - log_error( - f"Failed to load activity log from GCS (aborting save): {stderr or stdout}" - ) + log_error(f"Failed to read {gcs_uri}: {stderr or stdout}") return None except json.JSONDecodeError as e: - log_error(f"Failed to parse activity log: {e}") + log_error(f"Failed to parse JSON from {gcs_uri}: {e}") return None - def _save_activity_log(self, log_data: dict) -> bool: - """Save activity log to GCS. + def _write_entry_sidecar(self, activity: dict, workflow_run_id: str) -> bool: + """Write a single activity entry to its own unique GCS file. + + Using ``workflow_run_id`` as the filename guarantees concurrent jobs + never write to the same path, so no read-modify-write is needed here. Parameters ---------- - log_data : dict - Activity log data to save. + activity : dict + Activity entry data. + workflow_run_id : str + Unique workflow run identifier, used as the sidecar filename. Returns ------- @@ -104,30 +152,107 @@ def _save_activity_log(self, log_data: dict) -> bool: True on success, False on failure. """ + gcs_uri = f"{self.entries_gcs_prefix}{workflow_run_id}.json" try: - # Write to temp file with tempfile.NamedTemporaryFile( mode="w", delete=False, suffix=".json" ) as f: - json.dump(log_data, f, indent=2) + json.dump(activity, f, indent=2) temp_path = f.name + success = self._gcs_upload(temp_path, gcs_uri) + os.unlink(temp_path) + return success + except Exception as e: + log_error(f"Failed to write activity sidecar: {e}") + return False - # Upload to GCS - subprocess.run( - ["gcloud", "storage", "cp", temp_path, self.gcs_uri], - check=True, + def _list_entry_sidecars(self) -> list[str]: + """List all activity entry sidecar files in GCS. + + Returns + ------- + list[str] + List of GCS URIs for all sidecar entry files. + + """ + try: + result = subprocess.run( + ["gcloud", "storage", "ls", self.entries_gcs_prefix], capture_output=True, + text=True, + check=True, ) + return [ + line.strip() + for line in result.stdout.splitlines() + if line.strip().endswith(".json") + ] + except subprocess.CalledProcessError: + return [] + + def _rebuild_combined_log(self) -> bool: + """Rebuild ``bot_activity_log.json`` from all sidecar entries. + + Reads every sidecar under ``entries_prefix`` and merges them with any + existing entries in the combined log (preserving historical records that + predate the sidecar approach). Deduplicates by + ``(repo, pr_number, workflow_run_id)`` and sorts by timestamp. + + Because this method only writes a freshly computed view of all known + entries, concurrent calls are safe: the last writer will include all + sidecars that existed at the time of its read, and any entries written + concurrently will be picked up on the next rebuild. - # Clean up temp file - os.unlink(temp_path) + Returns + ------- + bool + True on success, False on failure. - return True - except subprocess.CalledProcessError as e: - log_error(f"Failed to upload activity log to GCS: {e}") - return False + """ + # Collect entries from all sidecars + sidecar_activities: list[dict] = [] + for uri in self._list_entry_sidecars(): + data = self._gcs_read_json(uri) + if data and isinstance(data, dict) and "repo" in data: + sidecar_activities.append(data) + + # Collect entries from existing combined log (historical, pre-sidecar) + existing = self._gcs_read_json(self.gcs_uri) + existing_activities: list[dict] = [] + if existing and isinstance(existing.get("activities"), list): + existing_activities = existing["activities"] + + # Merge and deduplicate — sidecars are authoritative for overlapping keys + seen: set[tuple] = set() + merged: list[dict] = [] + for activity in sidecar_activities + existing_activities: + key = ( + activity.get("repo", ""), + activity.get("pr_number", 0), + activity.get("workflow_run_id", ""), + ) + if key not in seen: + seen.add(key) + merged.append(activity) + + merged.sort(key=lambda a: a.get("timestamp") or "") + + log_data = { + "activities": merged, + "last_updated": datetime.now(timezone.utc).isoformat(), + } + + try: + with tempfile.NamedTemporaryFile( + mode="w", delete=False, suffix=".json" + ) as f: + json.dump(log_data, f, indent=2) + temp_path = f.name + success = self._gcs_upload(temp_path, self.gcs_uri) + os.unlink(temp_path) + return success except Exception as e: - log_error(f"Failed to save activity log: {e}") + log_error(f"Failed to rebuild combined activity log: {e}") return False def log_fix( @@ -146,8 +271,10 @@ def log_fix( ) -> bool: """Log a fix and merge activity. - This is called every time the fix command processes a PR, whether it - needed fixing or just rebasing and merging. + Writes the activity to a unique sidecar file first (safe against + concurrent jobs), then rebuilds the combined log from all sidecars. + If the rebuild fails the activity is still durably recorded in the + sidecar and will be included on the next successful rebuild. Parameters ---------- @@ -178,21 +305,13 @@ def log_fix( Returns ------- bool - True on success, False on failure. + True if the activity sidecar was written successfully (combined log + rebuild failure does not cause a False return). + False only if the sidecar write itself failed. """ log_info(f"Recording fix activity for {repo}#{pr_number}") - # Load existing log — returns None if load failed (auth/network error) - log_data = self._load_activity_log() - if log_data is None: - log_error( - f"Aborting activity log write for {repo}#{pr_number} " - "to prevent overwriting existing data after a GCS read failure" - ) - return False - - # Create activity entry with both failure_types (new) and failure_type (backward compat) activity = { "repo": repo, "pr_number": pr_number, @@ -209,18 +328,23 @@ def log_fix( "fix_time_hours": fix_time_hours, } - # Append activity - log_data["activities"].append(activity) - log_data["last_updated"] = datetime.now(timezone.utc).isoformat() - - # Save to GCS failure_types_str = ",".join(failure_types) - if self._save_activity_log(log_data): - log_success( - f"Fix activity recorded for {repo}#{pr_number} " - f"(status: {status}, types: {failure_types_str})" + + # Step 1: Write sidecar — unique filename guarantees no concurrent clobber + if not self._write_entry_sidecar(activity, workflow_run_id): + log_error(f"Failed to write activity sidecar for {repo}#{pr_number}") + return False + + log_success( + f"Fix activity sidecar written for {repo}#{pr_number} " + f"(status: {status}, types: {failure_types_str})" + ) + + # Step 2: Rebuild combined log — best-effort; data is safe in the sidecar + if not self._rebuild_combined_log(): + log_error( + f"Failed to rebuild combined activity log after {repo}#{pr_number} " + "(activity is safe in sidecar and will appear on next rebuild)" ) - return True - log_error(f"Failed to record fix activity for {repo}#{pr_number}") - return False + return True diff --git a/tests/observability/test_activity_logger.py b/tests/observability/test_activity_logger.py index dc75e70..e7b1bdd 100644 --- a/tests/observability/test_activity_logger.py +++ b/tests/observability/test_activity_logger.py @@ -15,29 +15,35 @@ def activity_logger(): return ActivityLogger( bucket="test-bucket", log_path="data/test_activity_log.json", + entries_prefix="data/test_activity_entries/", ) @pytest.fixture -def sample_activity_log(): - """Create a sample activity log structure.""" +def sample_activity(): + """Create a sample activity entry.""" return { - "activities": [ - { - "repo": "VectorInstitute/test-repo", - "pr_number": 41, - "pr_title": "Previous PR", - "pr_author": "app/dependabot", - "pr_url": "https://github.com/VectorInstitute/test-repo/pull/41", - "timestamp": "2025-12-19T10:00:00Z", - "workflow_run_id": "111111", - "github_run_url": "https://github.com/.../actions/runs/111111", - "status": "SUCCESS", - "failure_type": "lint", - "trace_path": "traces/2025/12/19/test-repo-pr-41.json", - "fix_time_hours": 0.25, - } - ], + "repo": "VectorInstitute/test-repo", + "pr_number": 41, + "pr_title": "Previous PR", + "pr_author": "app/dependabot", + "pr_url": "https://github.com/VectorInstitute/test-repo/pull/41", + "timestamp": "2025-12-19T10:00:00Z", + "workflow_run_id": "111111", + "github_run_url": "https://github.com/.../actions/runs/111111", + "status": "SUCCESS", + "failure_types": ["lint"], + "failure_type": "lint", + "trace_path": "data/traces/test-repo/41/111111.json", + "fix_time_hours": 0.25, + } + + +@pytest.fixture +def sample_activity_log(sample_activity): + """Create a sample combined activity log structure.""" + return { + "activities": [sample_activity], "last_updated": "2025-12-19T10:00:00Z", } @@ -55,94 +61,86 @@ def test_init_with_defaults(self): logger.gcs_uri == "gs://bot-dashboard-vectorinstitute/data/bot_activity_log.json" ) + assert logger.entries_prefix == "data/activity_entries/" + assert ( + logger.entries_gcs_prefix + == "gs://bot-dashboard-vectorinstitute/data/activity_entries/" + ) def test_init_with_custom_values(self): """Test initialization with custom bucket and log path.""" - logger = ActivityLogger(bucket="custom-bucket", log_path="custom/path.json") + logger = ActivityLogger( + bucket="custom-bucket", + log_path="custom/path.json", + entries_prefix="custom/entries/", + ) assert logger.bucket == "custom-bucket" assert logger.log_path == "custom/path.json" assert logger.gcs_uri == "gs://custom-bucket/custom/path.json" + assert logger.entries_prefix == "custom/entries/" + assert logger.entries_gcs_prefix == "gs://custom-bucket/custom/entries/" -class TestLoadActivityLog: - """Tests for _load_activity_log method.""" +class TestGcsReadJson: + """Tests for _gcs_read_json method.""" - def test_load_existing_log(self, activity_logger, sample_activity_log): - """Test loading an existing activity log.""" + def test_read_existing_file(self, activity_logger, sample_activity_log): + """Test reading an existing JSON file from GCS.""" mock_result = Mock() mock_result.stdout = json.dumps(sample_activity_log) with patch("subprocess.run", return_value=mock_result) as mock_run: - result = activity_logger._load_activity_log() + result = activity_logger._gcs_read_json(activity_logger.gcs_uri) - # Verify gcloud command was called correctly mock_run.assert_called_once_with( ["gcloud", "storage", "cat", activity_logger.gcs_uri], capture_output=True, text=True, check=True, ) - - # Verify returned data assert result == sample_activity_log - assert len(result["activities"]) == 1 - assert result["last_updated"] == "2025-12-19T10:00:00Z" - def test_load_nonexistent_log(self, activity_logger): - """Test loading when log file doesn't exist (404 / no URLs matched).""" + def test_read_nonexistent_file(self, activity_logger): + """Test reading a file that does not exist (404).""" error = subprocess.CalledProcessError(1, "gcloud") error.stderr = ( "ERROR: No URLs matched: gs://test-bucket/data/test_activity_log.json" ) error.stdout = "" with patch("subprocess.run", side_effect=error): - result = activity_logger._load_activity_log() + result = activity_logger._gcs_read_json(activity_logger.gcs_uri) - # Should return empty structure — safe to create new file assert result == {"activities": [], "last_updated": None} - def test_load_gcs_auth_failure(self, activity_logger): - """Test loading when gcloud fails due to auth/permission error.""" + def test_read_auth_failure_returns_none(self, activity_logger): + """Test that auth/permission failures return None (not empty dict).""" error = subprocess.CalledProcessError(1, "gcloud") error.stderr = ( "ERROR: (gcloud.storage.cat) User does not have storage.objects.get access" ) error.stdout = "" with patch("subprocess.run", side_effect=error): - result = activity_logger._load_activity_log() + result = activity_logger._gcs_read_json(activity_logger.gcs_uri) - # Should return None — caller must not overwrite existing data assert result is None - def test_load_invalid_json(self, activity_logger): - """Test loading when log contains invalid JSON.""" + def test_read_invalid_json_returns_none(self, activity_logger): + """Test that corrupted JSON returns None.""" mock_result = Mock() mock_result.stdout = "invalid json content" with patch("subprocess.run", return_value=mock_result): - result = activity_logger._load_activity_log() + result = activity_logger._gcs_read_json(activity_logger.gcs_uri) - # Should return None — corrupted file, do not overwrite assert result is None - def test_load_empty_json(self, activity_logger): - """Test loading when log is empty JSON object.""" - mock_result = Mock() - mock_result.stdout = "{}" - - with patch("subprocess.run", return_value=mock_result): - result = activity_logger._load_activity_log() - - # Should return the empty object - assert result == {} - -class TestSaveActivityLog: - """Tests for _save_activity_log method.""" +class TestWriteEntrySidecar: + """Tests for _write_entry_sidecar method.""" - def test_save_success(self, activity_logger, sample_activity_log): - """Test successfully saving activity log.""" + def test_write_sidecar_success(self, activity_logger, sample_activity): + """Test successfully writing an activity sidecar.""" mock_file_path = "/tmp/test_12345.json" with ( @@ -150,68 +148,189 @@ def test_save_success(self, activity_logger, sample_activity_log): patch("subprocess.run") as mock_run, patch("os.unlink") as mock_unlink, ): - # Mock temporary file mock_file = MagicMock() mock_file.name = mock_file_path mock_file.__enter__ = Mock(return_value=mock_file) mock_file.__exit__ = Mock(return_value=False) mock_tempfile.return_value = mock_file - # Test save - result = activity_logger._save_activity_log(sample_activity_log) + result = activity_logger._write_entry_sidecar(sample_activity, "111111") - # Verify success assert result is True - - # Verify tempfile was created - mock_tempfile.assert_called_once_with( - mode="w", delete=False, suffix=".json" - ) - - # Verify gcloud upload was called + expected_gcs_uri = f"{activity_logger.entries_gcs_prefix}111111.json" mock_run.assert_called_once_with( - ["gcloud", "storage", "cp", mock_file_path, activity_logger.gcs_uri], + ["gcloud", "storage", "cp", mock_file_path, expected_gcs_uri], check=True, capture_output=True, ) - - # Verify temp file was cleaned up mock_unlink.assert_called_once_with(mock_file_path) - def test_save_subprocess_error(self, activity_logger, sample_activity_log): - """Test save failure due to subprocess error.""" - mock_file_path = "/tmp/test_12345.json" + def test_write_sidecar_uses_workflow_run_id_as_filename( + self, activity_logger, sample_activity + ): + """Test that workflow_run_id is used as the sidecar filename.""" + with ( + patch("tempfile.NamedTemporaryFile") as mock_tempfile, + patch("subprocess.run") as mock_run, + patch("os.unlink"), + ): + mock_file = MagicMock() + mock_file.name = "/tmp/x.json" + mock_file.__enter__ = Mock(return_value=mock_file) + mock_file.__exit__ = Mock(return_value=False) + mock_tempfile.return_value = mock_file + + activity_logger._write_entry_sidecar(sample_activity, "99999") + dst = mock_run.call_args[0][0][-1] + assert dst == f"{activity_logger.entries_gcs_prefix}99999.json" + + def test_write_sidecar_upload_failure(self, activity_logger, sample_activity): + """Test sidecar write failure on upload error.""" with ( patch("tempfile.NamedTemporaryFile") as mock_tempfile, patch( "subprocess.run", side_effect=subprocess.CalledProcessError(1, "gcloud"), ), + patch("os.unlink"), ): - # Mock temporary file mock_file = MagicMock() - mock_file.name = mock_file_path + mock_file.name = "/tmp/x.json" mock_file.__enter__ = Mock(return_value=mock_file) mock_file.__exit__ = Mock(return_value=False) mock_tempfile.return_value = mock_file - # Test save - result = activity_logger._save_activity_log(sample_activity_log) + result = activity_logger._write_entry_sidecar(sample_activity, "111111") - # Verify failure assert result is False - def test_save_general_exception(self, activity_logger, sample_activity_log): - """Test save failure due to general exception.""" + +class TestListEntrySidecars: + """Tests for _list_entry_sidecars method.""" + + def test_list_returns_json_uris(self, activity_logger): + """Test listing sidecar files returns only .json URIs.""" + mock_result = Mock() + mock_result.stdout = ( + "gs://test-bucket/data/test_activity_entries/111111.json\n" + "gs://test-bucket/data/test_activity_entries/222222.json\n" + ) + + with patch("subprocess.run", return_value=mock_result): + result = activity_logger._list_entry_sidecars() + + assert result == [ + "gs://test-bucket/data/test_activity_entries/111111.json", + "gs://test-bucket/data/test_activity_entries/222222.json", + ] + + def test_list_empty_on_error(self, activity_logger): + """Test listing returns empty list on GCS error.""" with patch( - "tempfile.NamedTemporaryFile", - side_effect=Exception("File system error"), + "subprocess.run", + side_effect=subprocess.CalledProcessError(1, "gcloud"), + ): + result = activity_logger._list_entry_sidecars() + + assert result == [] + + +class TestRebuildCombinedLog: + """Tests for _rebuild_combined_log method.""" + + def test_rebuild_merges_sidecars_and_existing( + self, activity_logger, sample_activity + ): + """Test that rebuild merges sidecars with existing log entries.""" + new_activity = {**sample_activity, "pr_number": 42, "workflow_run_id": "222222"} + existing_log = { + "activities": [sample_activity], + "last_updated": "2025-12-19T10:00:00Z", + } + + with ( + patch.object( + activity_logger, + "_list_entry_sidecars", + return_value=[ + "gs://test-bucket/data/test_activity_entries/222222.json" + ], + ), + patch.object( + activity_logger, + "_gcs_read_json", + side_effect=[new_activity, existing_log], + ), + patch.object(activity_logger, "_gcs_upload", return_value=True), + patch("tempfile.NamedTemporaryFile") as mock_tempfile, + patch("os.unlink"), + ): + mock_file = MagicMock() + mock_file.name = "/tmp/x.json" + mock_file.__enter__ = Mock(return_value=mock_file) + mock_file.__exit__ = Mock(return_value=False) + mock_tempfile.return_value = mock_file + + result = activity_logger._rebuild_combined_log() + + assert result is True + + def test_rebuild_deduplicates_by_run_id(self, activity_logger, sample_activity): + """Test that rebuild deduplicates entries with the same workflow_run_id.""" + existing_log = { + "activities": [sample_activity], + "last_updated": "2025-12-19T10:00:00Z", + } + + with ( + patch.object( + activity_logger, + "_list_entry_sidecars", + return_value=[ + "gs://test-bucket/data/test_activity_entries/111111.json" + ], + ), + patch.object( + activity_logger, + "_gcs_read_json", + # sidecar has same workflow_run_id as existing entry + side_effect=[sample_activity, existing_log], + ), + patch.object(activity_logger, "_gcs_upload", return_value=True), + patch("tempfile.NamedTemporaryFile") as mock_tempfile, + patch("os.unlink"), + ): + mock_file = MagicMock() + mock_file.name = "/tmp/x.json" + mock_file.__enter__ = Mock(return_value=mock_file) + mock_file.__exit__ = Mock(return_value=False) + mock_tempfile.return_value = mock_file + + result = activity_logger._rebuild_combined_log() + assert result is True + + def test_rebuild_upload_failure(self, activity_logger): + """Test rebuild returns False when upload fails.""" + with ( + patch.object(activity_logger, "_list_entry_sidecars", return_value=[]), + patch.object( + activity_logger, + "_gcs_read_json", + return_value={"activities": [], "last_updated": None}, + ), + patch.object(activity_logger, "_gcs_upload", return_value=False), + patch("tempfile.NamedTemporaryFile") as mock_tempfile, + patch("os.unlink"), ): - # Test save - result = activity_logger._save_activity_log(sample_activity_log) + mock_file = MagicMock() + mock_file.name = "/tmp/x.json" + mock_file.__enter__ = Mock(return_value=mock_file) + mock_file.__exit__ = Mock(return_value=False) + mock_tempfile.return_value = mock_file + + result = activity_logger._rebuild_combined_log() - # Verify failure assert result is False @@ -222,13 +341,11 @@ def test_log_fix_success(self, activity_logger): """Test logging fix activity successfully.""" with ( patch.object( - activity_logger, - "_load_activity_log", - return_value={"activities": [], "last_updated": None}, - ) as mock_load, + activity_logger, "_write_entry_sidecar", return_value=True + ) as mock_sidecar, patch.object( - activity_logger, "_save_activity_log", return_value=True - ) as mock_save, + activity_logger, "_rebuild_combined_log", return_value=True + ) as mock_rebuild, ): result = activity_logger.log_fix( repo="VectorInstitute/test-repo", @@ -240,49 +357,87 @@ def test_log_fix_success(self, activity_logger): github_run_url="https://github.com/.../actions/runs/123456789", status="SUCCESS", failure_types=["test"], - trace_path="traces/2025/12/19/test-repo-pr-42.json", + trace_path="data/traces/test-repo/42/123456789.json", fix_time_hours=0.5, ) - # Verify success assert result is True - # Verify load was called - mock_load.assert_called_once() - - # Verify save was called with correct data - mock_save.assert_called_once() - saved_data = mock_save.call_args[0][0] - assert len(saved_data["activities"]) == 1 - activity = saved_data["activities"][0] - + # Sidecar is written with correct data + mock_sidecar.assert_called_once() + activity, run_id = mock_sidecar.call_args[0] + assert run_id == "123456789" assert activity["repo"] == "VectorInstitute/test-repo" assert activity["pr_number"] == 42 - assert activity["pr_title"] == "Bump dependency" - assert activity["pr_author"] == "app/dependabot" - assert activity["workflow_run_id"] == "123456789" assert activity["status"] == "SUCCESS" - assert ( - activity["failure_type"] == "test" - ) # Primary type for backward compat - assert activity["failure_types"] == ["test"] # Array of types - assert activity["trace_path"] == "traces/2025/12/19/test-repo-pr-42.json" + assert activity["failure_type"] == "test" + assert activity["failure_types"] == ["test"] assert activity["fix_time_hours"] == 0.5 + # Combined log is rebuilt + mock_rebuild.assert_called_once() + + def test_log_fix_sidecar_failure_returns_false(self, activity_logger): + """Test that sidecar write failure causes log_fix to return False.""" + with ( + patch.object(activity_logger, "_write_entry_sidecar", return_value=False), + patch.object( + activity_logger, "_rebuild_combined_log", return_value=True + ) as mock_rebuild, + ): + result = activity_logger.log_fix( + repo="VectorInstitute/test-repo", + pr_number=42, + pr_title="Bump dependency", + pr_author="app/dependabot", + pr_url="https://github.com/VectorInstitute/test-repo/pull/42", + workflow_run_id="123456789", + github_run_url="https://github.com/.../actions/runs/123456789", + status="SUCCESS", + failure_types=["test"], + trace_path="data/traces/test-repo/42/123456789.json", + fix_time_hours=0.5, + ) + + assert result is False + # Rebuild should not be attempted if sidecar failed + mock_rebuild.assert_not_called() + + def test_log_fix_rebuild_failure_still_returns_true(self, activity_logger): + """Test that rebuild failure returns True (activity safe in sidecar).""" + with ( + patch.object(activity_logger, "_write_entry_sidecar", return_value=True), + patch.object(activity_logger, "_rebuild_combined_log", return_value=False), + ): + result = activity_logger.log_fix( + repo="VectorInstitute/test-repo", + pr_number=42, + pr_title="Bump dependency", + pr_author="app/dependabot", + pr_url="https://github.com/VectorInstitute/test-repo/pull/42", + workflow_run_id="123456789", + github_run_url="https://github.com/.../actions/runs/123456789", + status="SUCCESS", + failure_types=["lint"], + trace_path="data/traces/test-repo/42/123456789.json", + fix_time_hours=0.25, + ) + + # Sidecar is safe — still a success from caller's perspective + assert result is True + def test_log_fix_all_status_types(self, activity_logger): """Test logging fix with different status types.""" for status in ["SUCCESS", "FAILED"]: with ( patch.object( - activity_logger, - "_load_activity_log", - return_value={"activities": [], "last_updated": None}, - ), + activity_logger, "_write_entry_sidecar", return_value=True + ) as mock_sidecar, patch.object( - activity_logger, "_save_activity_log", return_value=True - ) as mock_save, + activity_logger, "_rebuild_combined_log", return_value=True + ), ): - result = activity_logger.log_fix( + activity_logger.log_fix( repo="VectorInstitute/test-repo", pr_number=42, pr_title="Bump dependency", @@ -292,13 +447,12 @@ def test_log_fix_all_status_types(self, activity_logger): github_run_url="https://github.com/.../actions/runs/123456789", status=status, failure_types=["lint"], - trace_path="traces/2025/12/19/test-repo-pr-42.json", + trace_path="data/traces/test-repo/42/123456789.json", fix_time_hours=0.25, ) - assert result is True - saved_data = mock_save.call_args[0][0] - assert saved_data["activities"][0]["status"] == status + activity, _ = mock_sidecar.call_args[0] + assert activity["status"] == status def test_log_fix_all_failure_types(self, activity_logger): """Test logging fix with different failure types.""" @@ -315,15 +469,13 @@ def test_log_fix_all_failure_types(self, activity_logger): for ft in failure_type_list: with ( patch.object( - activity_logger, - "_load_activity_log", - return_value={"activities": [], "last_updated": None}, - ), + activity_logger, "_write_entry_sidecar", return_value=True + ) as mock_sidecar, patch.object( - activity_logger, "_save_activity_log", return_value=True - ) as mock_save, + activity_logger, "_rebuild_combined_log", return_value=True + ), ): - result = activity_logger.log_fix( + activity_logger.log_fix( repo="VectorInstitute/test-repo", pr_number=42, pr_title="Bump dependency", @@ -333,88 +485,10 @@ def test_log_fix_all_failure_types(self, activity_logger): github_run_url="https://github.com/.../actions/runs/123456789", status="SUCCESS", failure_types=[ft], - trace_path="traces/2025/12/19/test-repo-pr-42.json", + trace_path="data/traces/test-repo/42/123456789.json", fix_time_hours=0.25, ) - assert result is True - saved_data = mock_save.call_args[0][0] - assert saved_data["activities"][0]["failure_type"] == ft - assert saved_data["activities"][0]["failure_types"] == [ft] - - def test_log_fix_appends_to_existing_log(self, activity_logger): - """Test that fix activities are appended to existing log.""" - existing_log = { - "activities": [ - { - "repo": "VectorInstitute/other-repo", - "pr_number": 1, - "pr_title": "Old PR", - "pr_author": "app/dependabot", - "pr_url": "https://github.com/VectorInstitute/other-repo/pull/1", - "timestamp": "2025-12-18T10:00:00Z", - "workflow_run_id": "111111", - "github_run_url": "https://github.com/.../actions/runs/111111", - "status": "SUCCESS", - "failure_type": "lint", - "trace_path": "traces/2025/12/18/other-repo-pr-1.json", - "fix_time_hours": 0.25, - } - ], - "last_updated": "2025-12-18T10:00:00Z", - } - - with ( - patch.object( - activity_logger, "_load_activity_log", return_value=existing_log - ), - patch.object( - activity_logger, "_save_activity_log", return_value=True - ) as mock_save, - ): - activity_logger.log_fix( - repo="VectorInstitute/test-repo", - pr_number=42, - pr_title="New PR", - pr_author="app/dependabot", - pr_url="https://github.com/VectorInstitute/test-repo/pull/42", - workflow_run_id="123456789", - github_run_url="https://github.com/.../actions/runs/123456789", - status="SUCCESS", - failure_types=["test"], - trace_path="traces/2025/12/19/test-repo-pr-42.json", - fix_time_hours=0.5, - ) - - # Verify both activities are in the log - saved_data = mock_save.call_args[0][0] - assert len(saved_data["activities"]) == 2 - assert saved_data["activities"][0]["pr_number"] == 1 - assert saved_data["activities"][1]["pr_number"] == 42 - - def test_log_fix_save_failure(self, activity_logger): - """Test logging when save fails.""" - with ( - patch.object( - activity_logger, - "_load_activity_log", - return_value={"activities": [], "last_updated": None}, - ), - patch.object(activity_logger, "_save_activity_log", return_value=False), - ): - result = activity_logger.log_fix( - repo="VectorInstitute/test-repo", - pr_number=42, - pr_title="Bump dependency", - pr_author="app/dependabot", - pr_url="https://github.com/VectorInstitute/test-repo/pull/42", - workflow_run_id="123456789", - github_run_url="https://github.com/.../actions/runs/123456789", - status="SUCCESS", - failure_types=["test"], - trace_path="traces/2025/12/19/test-repo-pr-42.json", - fix_time_hours=0.5, - ) - - # Verify failure - assert result is False + activity, _ = mock_sidecar.call_args[0] + assert activity["failure_type"] == ft + assert activity["failure_types"] == [ft]