Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# vm-lifecycle-tracker
Python script that parses GH issues for potentially stale VMs
Python script that reports NetBox VMs whose `expire_date` custom field has passed or is about to pass.
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
requests==2.32.3
python-dateutil==2.9.0
265 changes: 63 additions & 202 deletions tracker.py
Original file line number Diff line number Diff line change
@@ -1,236 +1,97 @@
import json
import logging
import os
import re
from datetime import datetime, timezone
from datetime import date, datetime, timezone

import requests
from dateutil.relativedelta import relativedelta

logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
log = logging.getLogger(__name__)

CACHE_VERSION = 1


def parse_section(body: str, section: str) -> str:
"""Extract text under a ## section header, stripping HTML comments."""
pattern = rf"##\s+{re.escape(section)}\s*\n(.*?)(?=\n##\s|\Z)"
match = re.search(pattern, body, re.DOTALL | re.IGNORECASE)
if not match:
return ""
content = re.sub(r"<!--.*?-->", "", match.group(1), flags=re.DOTALL)
return content.strip()


def parse_duration(body: str, year_plus_months: int) -> tuple[str, relativedelta | None]:
"""Return (raw_value, expiry_offset). Offset is None for uncertain/indefinite."""
raw = parse_section(body, "Resource Duration").lower()
if "less than 6 months" in raw:
return "Less than 6 months", relativedelta(months=6)
if "6 months to a year" in raw:
return "6 months to a year", relativedelta(months=12)
if "year+" in raw:
return "year+", relativedelta(months=year_plus_months)
if "uncertain" in raw or "indefinite" in raw:
return "uncertain/indefinite", None
return f"unparseable: {raw[:80]}", None


def fetch_github_issues(repo: str, token: str, since: str | None = None) -> list[dict]:
"""Fetch all non-PR issues (open and closed), optionally filtered by update time."""
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
issues = []
page = 1
while True:
params = {"state": "all", "per_page": 100, "page": page}
if since:
params["since"] = since
resp = requests.get(
f"https://api.github.com/repos/{repo}/issues",
headers=headers,
params=params,
timeout=30,
)
resp.raise_for_status()
batch = resp.json()
if not batch:
break
issues.extend(i for i in batch if "pull_request" not in i)
page += 1
return issues


def check_netbox(project_name: str, netbox_url: str, token: str, issue_number: int) -> tuple[bool, str | None]:
"""Look up a VM by project name in Netbox, falling back to issue number in description."""
def fetch_vms(netbox_url: str, token: str) -> list[dict]:
headers = {"Authorization": f"Token {token}"}
endpoints = ("virtualization/virtual-machines", "dcim/devices")

# Pass 1: exact name match
if project_name:
for endpoint in endpoints:
try:
resp = requests.get(
f"{netbox_url}/api/{endpoint}/",
headers=headers,
params={"name": project_name, "limit": 1},
timeout=30,
)
if resp.status_code == 200:
results = resp.json().get("results", [])
if results:
return True, results[0]["name"]
except requests.exceptions.RequestException as e:
log.warning(f"Netbox name search failed for '{project_name}' on {endpoint}: {e}")

# Pass 2: search for issue number in description field
for endpoint in endpoints:
try:
resp = requests.get(
f"{netbox_url}/api/{endpoint}/",
headers=headers,
params={"description__icontains": f"#{issue_number}", "limit": 10},
timeout=30,
)
if resp.status_code == 200:
results = resp.json().get("results", [])
if results:
return True, results[0]["name"]
except requests.exceptions.RequestException as e:
log.warning(f"Netbox description search failed for issue #{issue_number} on {endpoint}: {e}")

return False, None


def load_cache(path: str) -> dict:
if os.path.exists(path):
try:
with open(path) as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
log.warning(f"Cache unreadable at {path}, starting fresh: {e}")
return {"version": CACHE_VERSION, "last_run": None, "issues": {}}
url = f"{netbox_url}/api/virtualization/virtual-machines/?limit=100"
vms: list[dict] = []
while url:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
payload = resp.json()
vms.extend(payload["results"])
url = payload.get("next")
return vms


def save_cache(cache: dict, path: str) -> None:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with open(path, "w") as f:
json.dump(cache, f, indent=2)
def _name(obj: dict | None) -> str | None:
return obj["name"] if obj else None


def compute_expiry(entry: dict, now: datetime, year_plus_months: int) -> None:
"""Recompute lifecycle_status, expiry_date, and expired_days_ago in place."""
duration = entry.get("duration_stated", "").lower()
try:
created_at = datetime.fromisoformat(entry["created_at"]).replace(tzinfo=timezone.utc)
except (ValueError, KeyError) as e:
log.warning(f"Invalid created_at for issue #{entry.get('issue_number')}: {e}")
entry["lifecycle_status"] = "unknown"
return

if "less than 6 months" in duration:
offset = relativedelta(months=6)
elif "6 months to a year" in duration:
offset = relativedelta(months=12)
elif "year+" in duration:
offset = relativedelta(months=year_plus_months)
else:
entry["lifecycle_status"] = "no_expiry"
entry.pop("expiry_date", None)
entry.pop("expired_days_ago", None)
return

expiry = created_at + offset
days = (now - expiry).days
entry["expiry_date"] = expiry.date().isoformat()
entry["expired_days_ago"] = days
entry["lifecycle_status"] = "expired" if days >= 0 else "active"


def process_issue(issue: dict, year_plus_months: int) -> dict | None:
"""Parse a GitHub issue into a cache entry. Returns None if not a VM request."""
body = issue.get("body") or ""
if "## Resource Duration" not in body:
return None
try:
created_at = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))
except ValueError as e:
log.warning(f"Issue #{issue['number']} has unparseable created_at: {e}")
return None

duration_raw, _ = parse_duration(body, year_plus_months)
def build_entry(vm: dict) -> dict:
cf = vm.get("custom_fields") or {}
return {
"issue_number": issue["number"],
"issue_url": issue["html_url"],
"project_name": parse_section(body, "Project Name"),
"team": parse_section(body, "Team Owner"),
"contact": parse_section(body, "Team Contact"),
"created_at": created_at.date().isoformat(),
"duration_stated": duration_raw,
"name": vm["name"],
"netbox_url": vm.get("display_url"),
"status": (vm.get("status") or {}).get("value"),
"tenant": _name(vm.get("tenant")),
"cluster": _name(vm.get("cluster")),
"site": _name(vm.get("site")),
"tags": [t["slug"] for t in vm.get("tags") or []],
"description": vm.get("description") or None,
"environment": cf.get("environment"),
"project": cf.get("project"),
"expire_date": cf.get("expire_date"),
}


def main() -> None:
github_token = os.environ["GITHUB_TOKEN"]
netbox_token = os.environ["NETBOX_TOKEN"]
github_repo = os.environ["GITHUB_REPO"]
token = os.environ["NETBOX_TOKEN"]
netbox_url = os.environ.get("NETBOX_URL", "https://netbox.ethquokkaops.io").rstrip("/")
year_plus_months = int(os.environ.get("THRESHOLD_YEAR_PLUS_MONTHS", "18"))
cache_file = os.environ.get("CACHE_FILE", "/data/cache.json")

now = datetime.now(timezone.utc)
cache = load_cache(cache_file)
last_run = cache.get("last_run")
expiring_soon_days = int(os.environ.get("EXPIRING_SOON_DAYS", "7"))

# Step 1: fetch new and updated issues from GitHub since last run
log.info(f"Fetching issues from {github_repo} (since {last_run or 'beginning'})")
today = datetime.now(timezone.utc).date()
try:
updated_issues = fetch_github_issues(github_repo, github_token, since=last_run)
vms = fetch_vms(netbox_url, token)
except requests.exceptions.RequestException as e:
log.error(f"GitHub API request failed: {e}")
log.error(f"NetBox API request failed: {e}")
raise SystemExit(1)
log.info(f"Fetched {len(updated_issues)} new/updated issues")
log.info(f"Fetched {len(vms)} VMs from {netbox_url}")

for issue in updated_issues:
entry = process_issue(issue, year_plus_months)
if entry:
cache["issues"][str(issue["number"])] = entry

# Step 2: recheck Netbox and recompute expiry for all cached issues
log.info(f"Rechecking {len(cache['issues'])} cached issues")
expired, no_expiry, not_found = [], [], []

for entry in cache["issues"].values():
netbox_match, netbox_vm = check_netbox(entry["project_name"], netbox_url, netbox_token, entry["issue_number"])
entry["netbox_match"] = netbox_match
entry["netbox_vm"] = netbox_vm
entry["last_checked"] = now.isoformat()

compute_expiry(entry, now, year_plus_months)

status = entry.get("lifecycle_status")
if not netbox_match:
not_found.append(entry)
elif status == "no_expiry":
expired, expiring_soon, no_expiry = [], [], []
for vm in vms:
entry = build_entry(vm)
raw = entry["expire_date"]
if not raw:
no_expiry.append(entry)
elif status == "expired":
continue
try:
expire = date.fromisoformat(raw)
except ValueError:
log.warning(f"VM '{entry['name']}' has unparseable expire_date: {raw!r}")
no_expiry.append(entry)
continue
delta = (expire - today).days
entry["days_until_expiry"] = delta
if delta < 0:
expired.append(entry)

expired.sort(key=lambda e: e.get("expired_days_ago", 0), reverse=True)

cache["last_run"] = now.isoformat()
save_cache(cache, cache_file)

log.info(f"Expired: {len(expired)}, no expiry: {len(no_expiry)}, not in Netbox: {len(not_found)}")
elif delta < expiring_soon_days:
expiring_soon.append(entry)

expired.sort(key=lambda e: e["days_until_expiry"])
expiring_soon.sort(key=lambda e: e["days_until_expiry"])
no_expiry.sort(key=lambda e: e["name"])

log.info(
f"Expired: {len(expired)}, "
f"expiring within {expiring_soon_days}d: {len(expiring_soon)}, "
f"no expiry set: {len(no_expiry)}"
)
print(json.dumps({
"generated_at": now.isoformat(),
"generated_at": datetime.now(timezone.utc).isoformat(),
"netbox_url": netbox_url,
"expiring_soon_days": expiring_soon_days,
"expired": expired,
"expiring_soon": expiring_soon,
"no_expiry": no_expiry,
"not_found": not_found,
}, indent=2))


Expand Down
Loading