Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 185 additions & 61 deletions src/aieng_bot/observability/activity_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,41 @@
class ActivityLogger:
"""Logger for bot fix and merge activities.

Records all bot activity to a unified log file in GCS for dashboard consumption.
Every invocation of the fix command logs an activity here.
Each fix is written to a unique per-run sidecar file in GCS so that
concurrent workflow jobs never overwrite each other's entries. After writing
the sidecar, the combined ``bot_activity_log.json`` is rebuilt from all
sidecars plus any historical entries in the existing log.

Parameters
----------
bucket : str, optional
GCS bucket name (default="bot-dashboard-vectorinstitute").
log_path : str, optional
Path to activity log in GCS (default="data/bot_activity_log.json").
Path to combined activity log in GCS (default="data/bot_activity_log.json").
entries_prefix : str, optional
GCS key prefix for individual activity entry files
(default="data/activity_entries/").

Attributes
----------
bucket : str
GCS bucket name.
log_path : str
Path to activity log in GCS.
Path to combined activity log in GCS.
gcs_uri : str
Full GCS URI for combined activity log.
entries_prefix : str
GCS key prefix for individual activity entry files.
entries_gcs_prefix : str
Full GCS URI prefix for individual activity entry files.

"""

def __init__(
self,
bucket: str = "bot-dashboard-vectorinstitute",
log_path: str = "data/bot_activity_log.json",
entries_prefix: str = "data/activity_entries/",
):
"""Initialize activity logger.

Expand All @@ -46,88 +58,201 @@ def __init__(
bucket : str, optional
GCS bucket name (default="bot-dashboard-vectorinstitute").
log_path : str, optional
Path to activity log in GCS (default="data/bot_activity_log.json").
Path to combined activity log in GCS
(default="data/bot_activity_log.json").
entries_prefix : str, optional
GCS key prefix for individual activity entry files
(default="data/activity_entries/").

"""
self.bucket = bucket
self.log_path = log_path
self.gcs_uri = f"gs://{bucket}/{log_path}"
self.entries_prefix = entries_prefix
self.entries_gcs_prefix = f"gs://{bucket}/{entries_prefix}"

def _load_activity_log(self) -> dict | None: # noqa: PYI041
"""Load existing activity log from GCS.
def _gcs_upload(self, local_path: str, gcs_uri: str) -> bool:
"""Upload a local file to GCS.

Parameters
----------
local_path : str
Local file path to upload.
gcs_uri : str
Destination GCS URI.

Returns
-------
bool
True on success, False on failure.

"""
try:
subprocess.run(
["gcloud", "storage", "cp", local_path, gcs_uri],
check=True,
capture_output=True,
)
return True
except subprocess.CalledProcessError as e:
log_error(f"Failed to upload to {gcs_uri}: {e.stderr or e.stdout}")
return False

def _gcs_read_json(self, gcs_uri: str) -> dict | None: # noqa: PYI041
"""Read and parse a JSON file from GCS.

Parameters
----------
gcs_uri : str
GCS URI to read.

Returns
-------
dict | None
Activity log with 'activities' list and 'last_updated' timestamp.
Returns empty structure if file doesn't exist (404).
Returns None if load failed for any other reason (caller must not save).
Parsed JSON dict, empty structure if file does not exist (404),
or None on any other error (caller must not overwrite).

"""
try:
result = subprocess.run(
["gcloud", "storage", "cat", self.gcs_uri],
["gcloud", "storage", "cat", gcs_uri],
capture_output=True,
text=True,
check=True,
)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
# Distinguish "not found" from auth/network failures.
# gcloud storage cat prints "No URLs matched" or "NotFound" for missing objects.
stderr = e.stderr or ""
stdout = e.stdout or ""
not_found_signals = ("No URLs matched", "NotFound", "does not exist", "404")
if any(s in stderr or s in stdout for s in not_found_signals):
# File doesn't exist yet — safe to create
return {"activities": [], "last_updated": None}
# Any other failure (auth, network, permissions) — do NOT overwrite
log_error(
f"Failed to load activity log from GCS (aborting save): {stderr or stdout}"
)
log_error(f"Failed to read {gcs_uri}: {stderr or stdout}")
return None
except json.JSONDecodeError as e:
log_error(f"Failed to parse activity log: {e}")
log_error(f"Failed to parse JSON from {gcs_uri}: {e}")
return None

def _save_activity_log(self, log_data: dict) -> bool:
"""Save activity log to GCS.
def _write_entry_sidecar(self, activity: dict, workflow_run_id: str) -> bool:
"""Write a single activity entry to its own unique GCS file.

Using ``workflow_run_id`` as the filename guarantees concurrent jobs
never write to the same path, so no read-modify-write is needed here.

Parameters
----------
log_data : dict
Activity log data to save.
activity : dict
Activity entry data.
workflow_run_id : str
Unique workflow run identifier, used as the sidecar filename.

Returns
-------
bool
True on success, False on failure.

"""
gcs_uri = f"{self.entries_gcs_prefix}{workflow_run_id}.json"
try:
# Write to temp file
with tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".json"
) as f:
json.dump(log_data, f, indent=2)
json.dump(activity, f, indent=2)
temp_path = f.name
success = self._gcs_upload(temp_path, gcs_uri)
os.unlink(temp_path)
return success
except Exception as e:
log_error(f"Failed to write activity sidecar: {e}")
return False

# Upload to GCS
subprocess.run(
["gcloud", "storage", "cp", temp_path, self.gcs_uri],
check=True,
def _list_entry_sidecars(self) -> list[str]:
"""List all activity entry sidecar files in GCS.

Returns
-------
list[str]
List of GCS URIs for all sidecar entry files.

"""
try:
result = subprocess.run(
["gcloud", "storage", "ls", self.entries_gcs_prefix],
capture_output=True,
text=True,
check=True,
)
return [
line.strip()
for line in result.stdout.splitlines()
if line.strip().endswith(".json")
]
except subprocess.CalledProcessError:
return []

def _rebuild_combined_log(self) -> bool:
"""Rebuild ``bot_activity_log.json`` from all sidecar entries.

Reads every sidecar under ``entries_prefix`` and merges them with any
existing entries in the combined log (preserving historical records that
predate the sidecar approach). Deduplicates by
``(repo, pr_number, workflow_run_id)`` and sorts by timestamp.

Because this method only writes a freshly computed view of all known
entries, concurrent calls are safe: the last writer will include all
sidecars that existed at the time of its read, and any entries written
concurrently will be picked up on the next rebuild.

# Clean up temp file
os.unlink(temp_path)
Returns
-------
bool
True on success, False on failure.

return True
except subprocess.CalledProcessError as e:
log_error(f"Failed to upload activity log to GCS: {e}")
return False
"""
# Collect entries from all sidecars
sidecar_activities: list[dict] = []
for uri in self._list_entry_sidecars():
data = self._gcs_read_json(uri)
if data and isinstance(data, dict) and "repo" in data:
sidecar_activities.append(data)

# Collect entries from existing combined log (historical, pre-sidecar)
existing = self._gcs_read_json(self.gcs_uri)
existing_activities: list[dict] = []
if existing and isinstance(existing.get("activities"), list):
existing_activities = existing["activities"]

# Merge and deduplicate — sidecars are authoritative for overlapping keys
seen: set[tuple] = set()
merged: list[dict] = []
for activity in sidecar_activities + existing_activities:
key = (
activity.get("repo", ""),
activity.get("pr_number", 0),
activity.get("workflow_run_id", ""),
)
if key not in seen:
seen.add(key)
merged.append(activity)

merged.sort(key=lambda a: a.get("timestamp") or "")

log_data = {
"activities": merged,
"last_updated": datetime.now(timezone.utc).isoformat(),
}

try:
with tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".json"
) as f:
json.dump(log_data, f, indent=2)
temp_path = f.name
success = self._gcs_upload(temp_path, self.gcs_uri)
os.unlink(temp_path)
return success
except Exception as e:
log_error(f"Failed to save activity log: {e}")
log_error(f"Failed to rebuild combined activity log: {e}")
return False

def log_fix(
Expand All @@ -146,8 +271,10 @@ def log_fix(
) -> bool:
"""Log a fix and merge activity.

This is called every time the fix command processes a PR, whether it
needed fixing or just rebasing and merging.
Writes the activity to a unique sidecar file first (safe against
concurrent jobs), then rebuilds the combined log from all sidecars.
If the rebuild fails the activity is still durably recorded in the
sidecar and will be included on the next successful rebuild.

Parameters
----------
Expand Down Expand Up @@ -178,21 +305,13 @@ def log_fix(
Returns
-------
bool
True on success, False on failure.
True if the activity sidecar was written successfully (combined log
rebuild failure does not cause a False return).
False only if the sidecar write itself failed.

"""
log_info(f"Recording fix activity for {repo}#{pr_number}")

# Load existing log — returns None if load failed (auth/network error)
log_data = self._load_activity_log()
if log_data is None:
log_error(
f"Aborting activity log write for {repo}#{pr_number} "
"to prevent overwriting existing data after a GCS read failure"
)
return False

# Create activity entry with both failure_types (new) and failure_type (backward compat)
activity = {
"repo": repo,
"pr_number": pr_number,
Expand All @@ -209,18 +328,23 @@ def log_fix(
"fix_time_hours": fix_time_hours,
}

# Append activity
log_data["activities"].append(activity)
log_data["last_updated"] = datetime.now(timezone.utc).isoformat()

# Save to GCS
failure_types_str = ",".join(failure_types)
if self._save_activity_log(log_data):
log_success(
f"Fix activity recorded for {repo}#{pr_number} "
f"(status: {status}, types: {failure_types_str})"

# Step 1: Write sidecar — unique filename guarantees no concurrent clobber
if not self._write_entry_sidecar(activity, workflow_run_id):
log_error(f"Failed to write activity sidecar for {repo}#{pr_number}")
return False

log_success(
f"Fix activity sidecar written for {repo}#{pr_number} "
f"(status: {status}, types: {failure_types_str})"
)

# Step 2: Rebuild combined log — best-effort; data is safe in the sidecar
if not self._rebuild_combined_log():
log_error(
f"Failed to rebuild combined activity log after {repo}#{pr_number} "
"(activity is safe in sidecar and will appear on next rebuild)"
)
return True

log_error(f"Failed to record fix activity for {repo}#{pr_number}")
return False
return True
Loading
Loading