Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 100 additions & 32 deletions agent_fox/engine/graph_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,53 +34,60 @@ def _spec_number(spec_name: str) -> tuple[int, str]:
return (float("inf"), spec_name) # type: ignore[return-value]


def _interleave_by_spec(
ready: list[str],
duration_hints: dict[str, int] | None = None,
) -> list[str]:
"""Order ready tasks with spec-fair round-robin interleaving.
def _is_auto_pre(node_id: str) -> bool:
"""Check if a node is an auto_pre archetype (group 0).

1. Group tasks by spec name (everything before first ':' in node ID).
2. Sort spec groups by spec number ascending (numeric prefix).
3. Within each group, sort by duration descending (if hints), else
alphabetically. Hinted tasks come before unhinted tasks.
4. Interleave across groups: take one from each spec per round.
Group 0 is reserved for auto_pre archetype nodes (pre-review,
drift-review, skeptic, etc.). Coder groups start at 1.

Args:
ready: List of ready node IDs.
duration_hints: Optional mapping of node_id -> predicted duration ms.
Requirements: 69-REQ-1.1
"""
parts = node_id.split(":")
return len(parts) >= 2 and parts[1] == "0"

Returns:
Spec-fair-ordered list of node IDs.

Requirements: 69-REQ-1.1, 69-REQ-1.3, 69-REQ-2.1, 69-REQ-2.2, 69-REQ-2.3
def _spec_round_robin(
tasks: list[str],
duration_hints: dict[str, int] | None = None,
fan_out_weights: dict[str, int] | None = None,
) -> list[str]:
"""Group by spec, sort within groups, and round-robin interleave.

Args:
tasks: List of node IDs to interleave.
duration_hints: Optional per-node duration hints.
fan_out_weights: Optional per-spec fan-out weights. When
provided, specs are sorted by fan-out descending (highest
impact first) with ties broken by spec number ascending.
"""
if not ready:
if not tasks:
return []

# Group tasks by spec name
groups: dict[str, list[str]] = {}
for node_id in ready:
for node_id in tasks:
spec = _spec_name(node_id)
groups.setdefault(spec, []).append(node_id)

# Sort spec groups by spec number ascending
sorted_specs = sorted(groups.keys(), key=_spec_number)
if fan_out_weights:
sorted_specs = sorted(
groups.keys(),
key=lambda s: (-fan_out_weights.get(s, 0), *_spec_number(s)),
)
else:
sorted_specs = sorted(groups.keys(), key=_spec_number)

# Sort within each group
sorted_groups: list[list[str]] = []
for spec in sorted_specs:
tasks = groups[spec]
spec_tasks = groups[spec]
if duration_hints:
hinted = [(t, duration_hints[t]) for t in tasks if t in duration_hints]
unhinted = [t for t in tasks if t not in duration_hints]
hinted = [(t, duration_hints[t]) for t in spec_tasks if t in duration_hints]
unhinted = [t for t in spec_tasks if t not in duration_hints]
hinted.sort(key=lambda x: x[1], reverse=True)
unhinted.sort()
sorted_groups.append([t for t, _ in hinted] + unhinted)
else:
sorted_groups.append(sorted(tasks))
sorted_groups.append(sorted(spec_tasks))

# Round-robin interleave across groups
result: list[str] = []
queues = [list(g) for g in sorted_groups]
while any(queues):
Expand All @@ -91,6 +98,49 @@ def _interleave_by_spec(
return result


def _interleave_by_spec(
ready: list[str],
duration_hints: dict[str, int] | None = None,
fan_out_weights: dict[str, int] | None = None,
) -> list[str]:
"""Order ready tasks with pre-review priority and spec-fair interleaving.

Partitions ready tasks into two tiers:

1. **Pre-review tier** (auto_pre nodes at group 0): sorted by spec
fan-out descending so critical-path specs surface blockers first.
2. **Regular tier** (coder and post-review nodes): sorted by spec
number ascending with spec-fair round-robin interleaving.

Within each tier, tasks are interleaved round-robin across spec
groups.

Args:
ready: List of ready node IDs.
duration_hints: Optional mapping of node_id -> predicted duration ms.
fan_out_weights: Optional mapping of spec_name -> fan-out weight
(count of distinct downstream specs).

Returns:
Pre-review-prioritized, spec-fair-ordered list of node IDs.

Requirements: 69-REQ-1.1, 69-REQ-1.3, 69-REQ-2.1, 69-REQ-2.2, 69-REQ-2.3
"""
if not ready:
return []

pre = [n for n in ready if _is_auto_pre(n)]
regular = [n for n in ready if not _is_auto_pre(n)]

result: list[str] = []
if pre:
result.extend(_spec_round_robin(pre, duration_hints, fan_out_weights))
if regular:
result.extend(_spec_round_robin(regular, duration_hints))

return result


class GraphSync:
"""Graph state propagation: ready detection, cascade blocking.

Expand Down Expand Up @@ -137,6 +187,10 @@ def ready_tasks(
- Its status is ``pending``
- All of its dependencies have status ``completed``

Pre-review nodes (auto_pre at group 0) are prioritized ahead of
coder nodes, with high-fan-out specs ordered first so that
critical-path blockers surface early.

Args:
duration_hints: Optional mapping of node_id to predicted
duration in milliseconds. When provided, ready tasks are
Expand All @@ -145,10 +199,8 @@ def ready_tasks(
of duration hints.

Returns:
List of ready node_ids in spec-fair round-robin order.
Spec groups are ordered by numeric prefix ascending; within
each spec group tasks are sorted alphabetically or by
duration descending when hints are provided.
List of ready node_ids in pre-review-prioritized,
spec-fair round-robin order.

Requirements: 39-REQ-1.1, 39-REQ-1.3, 69-REQ-1.1, 69-REQ-2.2
"""
Expand All @@ -160,7 +212,23 @@ def ready_tasks(
if all(self.node_states.get(d) == "completed" for d in deps):
ready.append(node_id)

return _interleave_by_spec(ready, duration_hints)
fan_out = self._compute_spec_fan_out()
return _interleave_by_spec(ready, duration_hints, fan_out)

def _compute_spec_fan_out(self) -> dict[str, int]:
"""Count distinct cross-spec dependent specs.

For each spec, count how many OTHER specs have at least one
node that depends on a node in this spec.
"""
spec_dependents: dict[str, set[str]] = {}
for node_id, dependents in self._dependents.items():
src_spec = _spec_name(node_id)
for dep_id in dependents:
dep_spec = _spec_name(dep_id)
if dep_spec != src_spec:
spec_dependents.setdefault(src_spec, set()).add(dep_spec)
return {spec: len(deps) for spec, deps in spec_dependents.items()}

def predecessors(self, node_id: str) -> list[str]:
"""Return predecessor node IDs for *node_id*."""
Expand Down
7 changes: 7 additions & 0 deletions agent_fox/graph/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,16 @@ def _add_cross_spec_edges(

# Propagate to direct intra-spec predecessors that are review nodes.
# This covers auto_pre nodes (skeptic/oracle) that gate the target.
#
# Exception: reviewer:pre-review nodes are exempt — they validate
# spec content (requirements, design), not upstream implementation,
# so they can run before upstream specs complete. Running them
# early surfaces blockers before coder work begins (fixes #476).
for pred_id in intra_preds.get(target_id, []):
pred_node = nodes.get(pred_id)
if pred_node is not None and pred_node.archetype != "coder":
if pred_node.archetype == "reviewer" and pred_node.mode == "pre-review":
continue
edges.append(Edge(source=source_id, target=pred_id, kind="cross_spec"))

return edges
Expand Down
94 changes: 55 additions & 39 deletions tests/property/engine/test_spec_fair_scheduling_props.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from hypothesis import given, settings
from hypothesis import strategies as st

from agent_fox.engine.graph_sync import _interleave_by_spec, _spec_name
from agent_fox.engine.graph_sync import _interleave_by_spec, _is_auto_pre, _spec_name

# ---------------------------------------------------------------------------
# Hypothesis strategies
Expand Down Expand Up @@ -156,7 +156,10 @@ def _spec_number(spec_name: str) -> tuple[int | float, str]:


class TestFairnessGuarantee:
"""Property test: every spec's first task appears in the first N positions.
"""Property test: within each tier, every spec's first task appears in the first N positions.

Pre-review nodes (group 0) form a separate priority tier. Fairness
is guaranteed within each tier independently.

Test Spec: TS-69-P1
Properties: Property 1 from design.md
Expand All @@ -166,18 +169,23 @@ class TestFairnessGuarantee:
@given(ready=multi_spec_list(min_specs=2, max_specs=10))
@settings(max_examples=200)
def test_fairness_guarantee(self, ready: list[str]) -> None:
"""Every spec's first task must appear within the first N positions."""
"""Within each tier, every spec's first task appears in the first N positions."""
result = _interleave_by_spec(ready)
specs = list({_spec_name(nid) for nid in ready})
n = len(specs)

for spec in specs:
first_index = next(i for i, nid in enumerate(result) if _spec_name(nid) == spec)
assert first_index < n, (
f"Spec '{spec}' first appears at index {first_index}, "
f"but should appear within first {n} positions. "
f"Result: {result}"
)

for tier_filter, label in [(_is_auto_pre, "pre"), (lambda n: not _is_auto_pre(n), "regular")]:
tier_tasks = [nid for nid in result if tier_filter(nid)]
specs = list({_spec_name(nid) for nid in tier_tasks})
n = len(specs)
if n == 0:
continue

for spec in specs:
first_index = next(i for i, nid in enumerate(tier_tasks) if _spec_name(nid) == spec)
assert first_index < n, (
f"[{label}] Spec '{spec}' first appears at index {first_index}, "
f"but should appear within first {n} positions. "
f"Tier tasks: {tier_tasks}"
)


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -207,7 +215,7 @@ def test_single_spec_identity(self, ready: list[str]) -> None:


class TestDurationPreservesWithinSpecOrder:
"""Property test: within each spec, tasks are in duration-descending order.
"""Property test: within each spec and tier, tasks are in duration-descending order.

Test Spec: TS-69-P3
Properties: Property 3 from design.md
Expand All @@ -217,34 +225,35 @@ class TestDurationPreservesWithinSpecOrder:
@given(spec_list_with_hints())
@settings(max_examples=200)
def test_duration_preserves_within_spec_order(self, args: tuple[list[str], dict[str, int]]) -> None:
"""Within each spec, tasks are ordered by duration descending in result."""
"""Within each spec and tier, tasks are ordered by duration descending."""
ready, hints = args
if not ready:
return

result = _interleave_by_spec(ready, duration_hints=hints if hints else None)
specs = list({_spec_name(nid) for nid in ready})

for spec in specs:
spec_tasks_in_result = [nid for nid in result if _spec_name(nid) == spec]
# Hinted tasks should come before unhinted, hinted in descending order
hinted = [(nid, hints[nid]) for nid in spec_tasks_in_result if nid in hints]
durations = [d for _, d in hinted]
assert durations == sorted(durations, reverse=True), (
f"Spec '{spec}' hinted tasks not in descending duration order: "
f"{hinted} in result {spec_tasks_in_result}"
)

# All hinted tasks should appear before unhinted tasks
unhinted_indices = [i for i, nid in enumerate(spec_tasks_in_result) if nid not in hints]
hinted_indices = [i for i, nid in enumerate(spec_tasks_in_result) if nid in hints]
if hinted_indices and unhinted_indices:
assert max(hinted_indices) < min(unhinted_indices), (
f"Spec '{spec}': hinted tasks don't all precede unhinted tasks. "
f"Hinted indices: {hinted_indices}, "
f"unhinted indices: {unhinted_indices}"
for tier_filter in [_is_auto_pre, lambda n: not _is_auto_pre(n)]:
tier_tasks = [nid for nid in result if tier_filter(nid)]
specs = list({_spec_name(nid) for nid in tier_tasks})

for spec in specs:
spec_tasks_in_result = [nid for nid in tier_tasks if _spec_name(nid) == spec]
hinted = [(nid, hints[nid]) for nid in spec_tasks_in_result if nid in hints]
durations = [d for _, d in hinted]
assert durations == sorted(durations, reverse=True), (
f"Spec '{spec}' hinted tasks not in descending duration order: "
f"{hinted} in result {spec_tasks_in_result}"
)

unhinted_indices = [i for i, nid in enumerate(spec_tasks_in_result) if nid not in hints]
hinted_indices = [i for i, nid in enumerate(spec_tasks_in_result) if nid in hints]
if hinted_indices and unhinted_indices:
assert max(hinted_indices) < min(unhinted_indices), (
f"Spec '{spec}': hinted tasks don't all precede unhinted tasks. "
f"Hinted indices: {hinted_indices}, "
f"unhinted indices: {unhinted_indices}"
)


# ---------------------------------------------------------------------------
# TS-69-P4: Completeness
Expand Down Expand Up @@ -285,7 +294,11 @@ def test_completeness(self, ready: list[str]) -> None:


class TestSpecOrderConsistency:
"""Property test: lower-numbered specs appear before higher-numbered specs.
"""Property test: within the regular tier, lower-numbered specs appear first.

Pre-review nodes (group 0) form a separate priority tier and are
exempt from spec-number ordering (they use fan-out ordering instead).
The spec-number consistency property applies to the regular tier.

Test Spec: TS-69-P5
Properties: Property 5 from design.md
Expand All @@ -295,14 +308,17 @@ class TestSpecOrderConsistency:
@given(ready=multi_spec_list(min_specs=2, max_specs=10))
@settings(max_examples=200)
def test_spec_order_consistency(self, ready: list[str]) -> None:
"""For specs A < B by number, A's first task appears before B's first task."""
"""Within the regular tier, A < B by numberA's first task before B's."""
result = _interleave_by_spec(ready)
specs = list({_spec_name(nid) for nid in ready})
regular_result = [nid for nid in result if not _is_auto_pre(nid)]
specs = list({_spec_name(nid) for nid in regular_result})
if len(specs) < 2:
return
specs_sorted = sorted(specs, key=_spec_number)

first_indices: dict[str, int] = {}
for spec in specs_sorted:
first_indices[spec] = next(i for i, nid in enumerate(result) if _spec_name(nid) == spec)
first_indices[spec] = next(i for i, nid in enumerate(regular_result) if _spec_name(nid) == spec)

for i in range(len(specs_sorted)):
for j in range(i + 1, len(specs_sorted)):
Expand All @@ -312,7 +328,7 @@ def test_spec_order_consistency(self, ready: list[str]) -> None:
f"Spec '{spec_a}' (number {_spec_number(spec_a)}) first appears "
f"at index {first_indices[spec_a]}, but spec '{spec_b}' "
f"(number {_spec_number(spec_b)}) first appears at "
f"index {first_indices[spec_b]}. Result: {result}"
f"index {first_indices[spec_b]}. Regular result: {regular_result}"
)


Expand Down
Loading