Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 80 additions & 3 deletions runtime_manager/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import hashlib
import mimetypes
import os
import re
import shutil
from pathlib import Path
from typing import Any

_SEGMENT_PATTERN = re.compile(r"[^A-Za-z0-9._-]+")
_PUBLISHED_DIR_NAME = ".published"
_BROWSABLE_MIME_PREFIXES = ("text/",)
_BROWSABLE_MIME_TYPES = {
"application/pdf",
Expand Down Expand Up @@ -73,13 +76,16 @@ def file_digest(path: Path) -> tuple[int, str]:


def artifact_id_for_file(path: Path, artifact_root: Path, sha256: str) -> str:
_ = sha256
try:
relative = path.resolve().relative_to(artifact_root.resolve())
except ValueError:
relative = path.name
stem = Path(str(relative)).stem or Path(str(relative)).name
name_segment = safe_artifact_segment(stem, fallback="artifact")[:48]
return f"{name_segment}-{sha256[:16]}"
relative_text = str(relative)
suffix = Path(relative_text).suffix
if suffix:
relative_text = relative_text[: -len(suffix)]
return safe_artifact_segment(relative_text, fallback="artifact")[:64]


def infer_mime_type(path: Path) -> str:
Expand Down Expand Up @@ -122,8 +128,11 @@ def discover_artifacts(
for path in candidates:
if len(artifacts) >= max_files:
break
if _is_published_path(path, root):
continue
try:
metadata = metadata_for_artifact_file(path, root)
metadata = publish_artifact_snapshot(path, root, metadata=metadata)
except (FileNotFoundError, OSError, ValueError):
continue
artifact_id = metadata["artifactId"]
Expand All @@ -147,15 +156,83 @@ def find_artifact_file(
return None
if not root.is_dir():
return None
if found := find_published_artifact_file(root, requested_id):
return found
try:
candidates = sorted(root.rglob("*"))
except OSError:
return None
for path in candidates:
if _is_published_path(path, root):
continue
try:
metadata = metadata_for_artifact_file(path, root)
except (FileNotFoundError, OSError, ValueError):
continue
if path.stem == requested_id or metadata["artifactId"] == requested_id:
return path.resolve(), metadata
return None


def publish_artifact_snapshot(path: Path, artifact_root: Path, *, metadata: dict[str, Any] | None = None) -> dict[str, Any]:
root = artifact_root.resolve()
file_path = path.resolve()
file_path.relative_to(root)
if _is_published_path(file_path, root):
raise ValueError("published artifact snapshots are not source artifacts")
metadata = dict(metadata or metadata_for_artifact_file(file_path, root))
artifact_id = str(metadata.get("artifactId") or "").strip()
if not artifact_id:
raise ValueError("artifactId is required")
snapshot_dir = _published_root(root) / safe_artifact_segment(artifact_id, fallback="artifact")
snapshot_dir.mkdir(parents=True, exist_ok=True)
snapshot_path = snapshot_dir / file_path.name
tmp_path = snapshot_dir / f".{file_path.name}.{os.getpid()}.tmp"
try:
shutil.copyfile(file_path, tmp_path)
os.replace(tmp_path, snapshot_path)
finally:
try:
tmp_path.unlink(missing_ok=True)
except OSError:
pass
return metadata


def find_published_artifact_file(
artifact_root: Path,
artifact_id: str,
) -> tuple[Path, dict[str, Any]] | None:
requested_id = str(artifact_id or "").strip()
if not requested_id or "/" in requested_id or "\\" in requested_id:
return None
try:
root = artifact_root.resolve()
snapshot_dir = _published_root(root) / safe_artifact_segment(requested_id, fallback="artifact")
except Exception:
return None
if not snapshot_dir.is_dir():
return None
try:
candidates = sorted(path for path in snapshot_dir.iterdir() if path.is_file())
except OSError:
return None
for path in candidates:
try:
metadata = metadata_for_artifact_file(path, snapshot_dir, artifact_id=requested_id)
except (FileNotFoundError, OSError, ValueError):
continue
return path.resolve(), metadata
return None


def _published_root(artifact_root: Path) -> Path:
return artifact_root / _PUBLISHED_DIR_NAME


def _is_published_path(path: Path, artifact_root: Path) -> bool:
try:
path.resolve().relative_to(_published_root(artifact_root.resolve()).resolve())
return True
except ValueError:
return False
59 changes: 59 additions & 0 deletions tests/runtime_manager/test_report_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,40 @@ def test_report_artifact_metadata_is_discovered_from_controlled_dir(tmp_path):
assert "path" not in json.dumps(artifact, ensure_ascii=False).lower()


def test_report_artifact_snapshot_updates_same_path_without_new_asset(tmp_path):
from runtime_manager.artifacts import discover_artifacts, find_artifact_file

artifact_dir = tmp_path / "artifacts"
artifact_dir.mkdir()
report = artifact_dir / "analysis.txt"
report.write_text("first report", encoding="utf-8")

first = discover_artifacts(artifact_dir, seen_artifact_ids=set())[0]
first_id = first["artifactId"]
report.write_text("second report", encoding="utf-8")
second = discover_artifacts(artifact_dir, seen_artifact_ids={first_id})

assert second == []
first_path, first_metadata = find_artifact_file(artifact_dir, first_id)
assert first_path.read_text(encoding="utf-8") == "second report"
assert first_metadata["artifactId"] == first_id
assert first_metadata["sha256"] == hashlib.sha256(b"second report").hexdigest()


def test_report_artifact_discovery_ignores_published_snapshots(tmp_path):
from runtime_manager.artifacts import discover_artifacts

artifact_dir = tmp_path / "artifacts"
artifact_dir.mkdir()
report = artifact_dir / "analysis.txt"
report.write_text("first report", encoding="utf-8")
discover_artifacts(artifact_dir, seen_artifact_ids=set())

discovered = discover_artifacts(artifact_dir, seen_artifact_ids=set())

assert [artifact["fileName"] for artifact in discovered] == ["analysis.txt"]


def test_runtime_worker_adds_report_artifact_without_stdout_truncation(tmp_path, monkeypatch):
from runtime_manager.worker_main import _safe_tool_result_fields

Expand Down Expand Up @@ -81,6 +115,31 @@ def test_runtime_manager_serves_report_artifact_by_user_session_run(tmp_path):
assert "awr.html" in response.headers["content-disposition"]


def test_runtime_manager_serves_latest_published_report_artifact_after_source_overwrite(tmp_path):
from runtime_manager.artifacts import discover_artifacts

user_home = tmp_path / "user-1"
artifact_dir = user_home / "sessions" / "conv-1.artifacts" / "run-1"
artifact_dir.mkdir(parents=True)
report = artifact_dir / "analysis.txt"
report.write_text("first report", encoding="utf-8")
metadata = discover_artifacts(artifact_dir, seen_artifact_ids=set())[0]
report.write_text("second report", encoding="utf-8")
assert discover_artifacts(artifact_dir, seen_artifact_ids={metadata["artifactId"]}) == []

app = create_app(users_root=tmp_path, api_key="secret")
client = TestClient(app)

response = client.get(
f"/agent/sessions/conv-1/artifacts/{metadata['artifactId']}",
params={"user_id": "user-1", "run_id": "run-1"},
headers={"Authorization": "Bearer secret"},
)

assert response.status_code == 200
assert response.content == b"second report"


def test_runtime_manager_rejects_artifact_path_escape(tmp_path):
from runtime_manager.artifacts import metadata_for_artifact_file

Expand Down
Loading