Skip to content
42 changes: 24 additions & 18 deletions src/skidl/schematics/place.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,15 @@ def add_placement_bboxes(parts, **options):
part.place_bbox.add(part.lbl_bbox)

# Compute the routing area for each side based on the number of pins on each side.
# Skip pins on deferred-stub nets — they haven't been classified yet and
# inflating bbox for them scatters placement, making post-placement distance
# checks fail.
padding = {"U": 1, "D": 1, "L": 1, "R": 1} # Min padding of 1 channel per side.
for pin in part:
if pin.stub is False and pin.is_connected():
net = pin.net
if getattr(net, "_deferred_stub", False):
continue
padding[pin.orientation] += 1

# expansion_factor > 1 is used to expand the area for routing around each part,
Expand Down Expand Up @@ -1643,11 +1649,11 @@ def _auto_stub_large_groups(node, groups, internal_nets, **options):
if len(group) <= max_group:
continue

# Collect low-fanout internal nets in this group (chain links
# and small-fanout connections). Prioritize 2-pin nets (chains),
# then 3-pin, then 4-pin.
# Collect low-fanout internal nets in this group, split into
# non-deferred (safe to stub) and deferred (want to keep connected).
group_ids = {id(p) for p in group}
chain_nets = []
safe_nets = []
deferred_nets = []
for net in internal_nets:
if getattr(net, "_stub_explicit", False) or getattr(
net, "stub", False
Expand All @@ -1657,30 +1663,30 @@ def _auto_stub_large_groups(node, groups, internal_nets, **options):
id(p.part) for p in net.pins if id(p.part) in group_ids
}
if 2 <= len(net_parts) <= 4:
chain_nets.append((len(net_parts), net))
if getattr(net, "_deferred_stub", False):
deferred_nets.append((len(net_parts), net))
else:
safe_nets.append((len(net_parts), net))

# Sort by fanout (prefer stubbing 2-pin nets first).
chain_nets.sort(key=lambda x: x[0])
chain_nets = [net for _, net in chain_nets]
safe_nets.sort(key=lambda x: x[0])
deferred_nets.sort(key=lambda x: x[0])
chain_nets = [net for _, net in safe_nets] + [net for _, net in deferred_nets]

if not chain_nets:
continue

n_cuts = max(1, (len(group) + max_group - 1) // max_group)

active_logger.info(
f" [auto_stub_large_groups] Group of {len(group)} parts, "
f"{len(chain_nets)} chain nets, splitting..."
f"{len(chain_nets)} chain nets ({len(safe_nets)} safe), splitting..."
)

# Stub evenly-spaced chain nets to split the group into chunks
# of ~max_group parts. We need ceil(len/max)-1 cuts minimum.
n_cuts = max(1, (len(group) + max_group - 1) // max_group)
step = max(1, len(chain_nets) // (n_cuts + 1))
if step < 1:
step = 1
# Stub non-deferred nets first, then deferred only if needed.
stubbed = 0

for i in range(step, len(chain_nets), step):
net = chain_nets[i]
for net in chain_nets:
if stubbed >= n_cuts:
break
net._stub = True
net._stub_explicit = False
for p in net.get_pins():
Expand Down
158 changes: 156 additions & 2 deletions src/skidl/schematics/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,109 @@ class SwitchboxRoutingFailure(RoutingFailure):
pass


class _RoutingTimeout(RoutingFailure):
"""Raised when routing exceeds its time budget."""
pass


def _route_with_timeout(child, timeout_sec=30, **kwargs):
"""Route a child node with a SIGALRM timeout.

Falls back to no-timeout on non-Unix or when SIGALRM is unavailable.
"""
import signal
handler = None
try:
def _alarm_handler(signum, frame):
raise _RoutingTimeout(f"routing timed out after {timeout_sec}s")
handler = signal.signal(signal.SIGALRM, _alarm_handler)
signal.alarm(timeout_sec)
child.route(**kwargs)
finally:
signal.alarm(0)
if handler is not None:
signal.signal(signal.SIGALRM, handler)


def _rescue_close_nets(child, **options):
"""Un-stub simple internal nets that are close enough to wire directly.

Two-phase approach:
1. Find all auto-stubbed nets with few internal pins (candidates)
2. Keep only those whose pins are close together after placement

NetTerminal parts are excluded from pin counts (schematic artifacts).
If routing still fails, the caller undoes rescue and falls back to
progressive stubbing.

Returns list of (net, [pins]) tuples for later undo if routing fails.
"""
from skidl.schematics.net_terminal import NetTerminal

max_wire_pins = options.get("auto_stub_max_wire_pins", 3)
max_wire_dist = options.get("auto_stub_max_wire_dist", 2000)
child_parts = set(child.parts)
rescued = []
seen = set()

for part in child.parts:
for pin in part:
if not pin.is_connected():
continue
net = pin.net
if id(net) in seen:
continue
seen.add(id(net))

if not getattr(net, "_stub", False):
continue
if getattr(net, "_stub_explicit", False):
continue

internal_pins = [p for p in net.pins if p.part in child_parts]
real_pins = [p for p in internal_pins
if not isinstance(p.part, NetTerminal)]
if len(real_pins) < 2 or len(real_pins) > max_wire_pins:
continue

pts = []
for p in real_pins:
pin_pt = getattr(p, "place_pt", getattr(p, "pt", Point(p.x, p.y)))
part_tx = getattr(p.part, "tx", None)
if part_tx:
pts.append(pin_pt * part_tx)
else:
pts.append(pin_pt)

max_dist = 0
for i, a in enumerate(pts):
for b in pts[i + 1:]:
dist = abs(a.x - b.x) + abs(a.y - b.y)
if dist > max_dist:
max_dist = dist

if max_dist <= max_wire_dist:
for p in internal_pins:
p.stub = False
rescued.append((net, internal_pins))

if rescued:
from skidl.logger import active_logger
active_logger.info(
f" [rescue] {len(rescued)} nets un-stubbed in "
f"{getattr(child, 'name', '?')} for direct wiring"
)

return rescued


def _undo_rescue(rescued):
"""Re-stub pins that were rescued if routing fails."""
for net, pins in rescued:
for p in pins:
p.stub = True


class Boundary:
"""Class for indicating a boundary.

Expand Down Expand Up @@ -3136,9 +3239,60 @@ def route(node, tool=None, **options):
node.rmv_routing_stuff()

# First, recursively route any children of this node.
# TODO: Child nodes are independent so could they be processed in parallel?
route_timeout = options.get("route_timeout", 30)
for child in node.children.values():
child.route(tool=tool, **options)
rescued = _rescue_close_nets(child, **options)
try:
_route_with_timeout(child, timeout_sec=route_timeout, tool=tool, **options)
continue
except (RoutingFailure, KeyError):
pass

# Undo rescue before falling back.
if rescued:
_undo_rescue(rescued)
child.rmv_routing_stuff()
try:
_route_with_timeout(child, timeout_sec=route_timeout, tool=tool, **options)
continue
except (RoutingFailure, KeyError):
pass

# Progressive stubbing: stub largest nets first.
child_parts = set(child.parts)
unstubbed = []
seen = set()
for part in child.parts:
for pin in part:
if pin.stub or not pin.is_connected():
continue
net = pin.net
if id(net) in seen:
continue
seen.add(id(net))
n_pins = sum(1 for p in net.pins if p.part in child_parts)
unstubbed.append((n_pins, net))
unstubbed.sort(key=lambda x: x[0], reverse=True)
recovered = False
for threshold in [4, 3, 2]:
to_stub = [n for pins, n in unstubbed
if pins >= threshold and not getattr(n, "_stub", False)]
if not to_stub:
continue
for net in to_stub:
net._stub = True
for pin in net.get_pins():
pin.stub = True
child.rmv_routing_stuff()
try:
_route_with_timeout(child, timeout_sec=route_timeout, tool=tool, **options)
recovered = True
break
except (RoutingFailure, KeyError):
pass
if not recovered:
child.rmv_routing_stuff()
_route_with_timeout(child, timeout_sec=route_timeout, tool=tool, **options)

# Exit if no parts to route in this node.
if not node.parts:
Expand Down
2 changes: 1 addition & 1 deletion src/skidl/schematics/sch_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def get_internal_nets(self):

processed_nets.append(net)

# Skip stubbed nets.
# Skip stubbed nets — they use labels, not wires.
if getattr(net, "stub", False) is True:
continue

Expand Down
Loading