Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 253 additions & 21 deletions src/mantis_agent/gym/learning_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import logging
import time
from dataclasses import dataclass
from typing import Any
from typing import TYPE_CHECKING, Any

from ..verification.playbook import (
ExtractionPattern,
Expand All @@ -32,6 +32,9 @@
from ..verification.step_verifier import StepVerifier, VerificationResult
from .runner import GymRunner

if TYPE_CHECKING:
from ..sim_envs.cli_integration import EnvSession

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -65,13 +68,212 @@ def __init__(
grounding: Any = None,
on_step: Any = None,
max_attempts_per_step: int = 3,
oracle_session: "EnvSession | None" = None,
):
self.brain = brain
self.env = env
self.verifier = verifier
self.grounding = grounding
self.on_step = on_step
self.max_attempts = max_attempts_per_step
# When a sim-env session is attached, the runner replaces
# vision-based ``verify_step`` / ``verify_filter`` /
# ``verify_on_correct_page`` calls with cheap server-side
# probes against ``/__env__/mutations`` plus a URL substring
# check. See :meth:`_oracle_verify_url_signals` and
# :meth:`_oracle_verify_state_change`. The vision verifier
# stays as the fallback for extraction-iteration steps where
# the env can't surface the relevant signal (popup detection,
# gallery traps, etc.).
self.oracle_session = oracle_session
# Monotonic cursor into the env's mutations log so successive
# ``fetch_mutations`` calls only return entries from after
# the previous step.
self._last_mutation_id: int = 0

# ── Oracle-path verifier helpers (#594) ─────────────────────────────

def _env_current_url(self) -> str:
"""Best-effort fetch of the env's current URL.

Different env adapters expose the URL via different accessors:
Playwright + Modal back the property as ``current_url`` (string),
some legacy adapters use ``url`` or a ``get_url()`` method. We
try them in order and return ``""`` if none match — the oracle
verifier degrades gracefully to "verified=False" on an empty
URL rather than crashing the run.
"""
env = self.env
if env is None:
return ""
for attr in ("current_url", "url"):
val = getattr(env, attr, None)
if callable(val):
try:
return str(val() or "")
except Exception: # noqa: BLE001
continue
if val:
return str(val)
getter = getattr(env, "get_url", None)
if callable(getter):
try:
return str(getter() or "")
except Exception: # noqa: BLE001
return ""
return ""

def _oracle_verify_url_signals(
self,
signals: list[str],
) -> VerificationResult:
"""Cheap replacement for ``verify_filter`` / ``verify_on_correct_page``.

The plan-author hints (``["private seller", "by owner",
"by-owner"]`` etc.) are token substrings expected to appear in
the current URL when the page is in the right state.
Case-insensitive match; ANY signal hit counts as verified.

Returns ``VerificationResult`` with the same shape as the
vision verifier so callers don't need to branch.
"""
url = self._env_current_url().lower()
if not url:
return VerificationResult(
verified=False,
page_changed=False,
issue="no_url",
suggestion="env did not expose a URL — oracle path can't verify",
confidence=0.5,
details="empty env URL",
)
if not signals:
return VerificationResult(
verified=True,
page_changed=True,
confidence=0.5,
details=f"no signals supplied; URL={url}",
)
matches = [s for s in signals if s and s.lower() in url]
verified = bool(matches)
return VerificationResult(
verified=verified,
page_changed=True,
issue="" if verified else "filter_lost",
suggestion="" if verified else (
f"none of {signals} found in URL {url}"
),
confidence=0.9 if verified else 0.85,
details=f"matched={matches} url={url}",
)

def _oracle_verify_state_change(
self,
expected_ops: set[str] | None = None,
) -> VerificationResult:
"""Cheap replacement for ``verify_step`` on state-changing actions.

Polls ``GET /__env__/mutations?since=<cursor>``. If any entries
landed, advances the cursor and returns verified=True (or False
when ``expected_ops`` is set and no entry matches). If no
mutations landed, returns verified=False with
``issue="no_state_change"``.

Note this only works for steps the env's harness instruments
with mutations — for boattrader: lead submission, consent set,
phone revealed, env reset. Filter application + plain
navigation are URL-only; callers verify those via
:meth:`_oracle_verify_url_signals` instead.
"""
from ..sim_envs.oracle_client import fetch_mutations, last_mutation_id

if self.oracle_session is None:
# Caller should have gated on this — fail loud rather than
# silently treating the absence as "verified".
return VerificationResult(
verified=False,
page_changed=False,
issue="no_oracle_session",
confidence=0.0,
details="oracle_session is None",
)

resp = fetch_mutations(
self.oracle_session.url,
self.oracle_session.admin_token,
since_id=self._last_mutation_id,
)
if resp.get("error"):
return VerificationResult(
verified=False,
page_changed=False,
issue="oracle_unreachable",
suggestion=f"fetch_mutations failed: {resp['error']}",
confidence=0.4,
details=resp["error"],
)

mutations = resp.get("mutations", [])
if not mutations:
return VerificationResult(
verified=False,
page_changed=False,
issue="no_state_change",
suggestion="mutation log empty — agent's last action had no observable effect",
confidence=0.9,
details="empty mutation delta",
)

# Advance cursor BEFORE checking content so a non-matching delta
# still doesn't re-fire on the next poll.
self._last_mutation_id = max(self._last_mutation_id, last_mutation_id(mutations))

if expected_ops:
matched = [m for m in mutations
if str(m.get("operation") or "") in expected_ops]
verified = bool(matched)
ops_seen = [m.get("operation") for m in mutations]
return VerificationResult(
verified=verified,
page_changed=True,
issue="" if verified else "wrong_op",
suggestion="" if verified else (
f"expected one of {sorted(expected_ops)}, got {ops_seen}"
),
confidence=0.95 if verified else 0.85,
details=f"matched={[m.get('operation') for m in matched]} "
f"all={ops_seen}",
)

# No expected_ops filter — any mutation is signal of state change.
return VerificationResult(
verified=True,
page_changed=True,
confidence=0.9,
details=f"{len(mutations)} mutation(s): "
f"{[m.get('operation') for m in mutations]}",
)

def _sync_mutation_cursor(self) -> None:
"""Advance ``_last_mutation_id`` to the env's current tail
without producing a VerificationResult.

Called before a phase-start to ignore any prior mutations
(e.g. seeding, reset) so the first step's
:meth:`_oracle_verify_state_change` only sees mutations that
post-date the phase's first action.
"""
if self.oracle_session is None:
return
from ..sim_envs.oracle_client import fetch_mutations, last_mutation_id
resp = fetch_mutations(
self.oracle_session.url,
self.oracle_session.admin_token,
since_id=0,
)
muts = resp.get("mutations") or []
if muts:
self._last_mutation_id = last_mutation_id(muts)

def learn(
self,
Expand Down Expand Up @@ -141,6 +343,13 @@ def _learn_setup(
self.env.reset(task="navigate", start_url=start_url)
time.sleep(4)

# On the oracle path, sync the mutation cursor to the env's
# current tail so the first filter step's verification only
# sees mutations from the agent — not the reset that may have
# just stamped ``env_reset``.
if self.oracle_session is not None:
self._sync_mutation_cursor()

# Define atomic filter steps — positive, small instructions
atomic_filters = [
(
Expand Down Expand Up @@ -182,17 +391,25 @@ def _learn_setup(

after = self.env.screenshot()

# Verify this specific filter was applied
verify = self.verifier.verify_step(
before, after,
intent=f"Apply filter: {filter_name}",
action=f"{result.total_steps} steps",
)
# Verify this specific filter was applied. Oracle path:
# a filter click is a URL change — verify by URL pattern,
# not by mutation log (filters don't stamp mutations in
# the boattrader env). Vision path: before/after screenshot
# diff as before.
if self.oracle_session is not None:
verify = self._oracle_verify_url_signals(verify_signals)
filter_check = verify # same signal on the oracle path
else:
verify = self.verifier.verify_step(
before, after,
intent=f"Apply filter: {filter_name}",
action=f"{result.total_steps} steps",
)

# Also check page for the expected signals
filter_check = self.verifier.verify_filter(
after, verify_signals, max_results=50000,
)
# Also check page for the expected signals
filter_check = self.verifier.verify_filter(
after, verify_signals, max_results=50000,
)

step = PlaybookStep(
name=filter_name,
Expand Down Expand Up @@ -233,9 +450,12 @@ def _learn_setup(
task_id=f"learn_{filter_name}_retry",
)
after_retry = self.env.screenshot()
retry_check = self.verifier.verify_filter(
after_retry, verify_signals, max_results=50000,
)
if self.oracle_session is not None:
retry_check = self._oracle_verify_url_signals(verify_signals)
else:
retry_check = self.verifier.verify_filter(
after_retry, verify_signals, max_results=50000,
)
step.update_confidence(retry_check.verified)
if retry_check.verified:
logger.info(" Retry VERIFIED")
Expand Down Expand Up @@ -263,12 +483,19 @@ def _learn_extraction(
# Before: on search results page
before = self.env.screenshot()

# Verify we're on a results page
page_check = self.verifier.verify_on_correct_page(
before,
expected_description="Search results page with boat listings",
expected_signals=["listing", "boat", "price", "photo"],
)
# Verify we're on a results page. Oracle path: URL pattern
# check (results page URLs contain ``/boats`` or similar).
# Vision path: image-content check as before.
if self.oracle_session is not None:
page_check = self._oracle_verify_url_signals(
["/boats", "/listings", "/search", "boats/"],
)
else:
page_check = self.verifier.verify_on_correct_page(
before,
expected_description="Search results page with boat listings",
expected_signals=["listing", "boat", "price", "photo"],
)
if not page_check.verified:
logger.warning(f" Not on results page: {page_check.issue}")
playbook.known_traps.append(f"Lost results page at iteration {i}: {page_check.issue}")
Expand All @@ -293,7 +520,12 @@ def _learn_extraction(
# After: check what happened
after = self.env.screenshot()

# Verify the iteration
# Verify the iteration — extraction reads the page and
# produces no mutation, so the oracle path can't tell
# success from failure here. Vision still adds value:
# detects gallery traps / dealer-page drift / popups even
# when the URL hasn't changed. Stays on the vision path
# regardless of ``oracle_session``.
verify = self.verifier.verify_step(
before, after,
intent=f"Extract data from listing #{i}",
Expand Down
Loading
Loading