From 83df926456e6ae10bd3de9f91605f05e66e313df Mon Sep 17 00:00:00 2001 From: Sean Evans Date: Tue, 21 Apr 2026 12:04:15 -0400 Subject: [PATCH] Refactor SandboxThread initialization state wiring --- pyisolate/runtime/thread.py | 117 +++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 47 deletions(-) diff --git a/pyisolate/runtime/thread.py b/pyisolate/runtime/thread.py index 8634391..49ec62f 100644 --- a/pyisolate/runtime/thread.py +++ b/pyisolate/runtime/thread.py @@ -402,9 +402,8 @@ def _merge_allowed_imports(policy, allowed_imports: Optional[Iterable[str]]): return imports return None - def __init__( + def _init_config_wiring( self, - name: str, policy=None, cpu_ms: Optional[int] = None, mem_bytes: Optional[int] = None, @@ -413,18 +412,11 @@ def __init__( network_ops_max: Optional[int] = None, output_bytes_max: Optional[int] = None, child_work_max: Optional[int] = None, - allowed_imports: Optional[list[str]] = None, - on_violation: Optional[Callable[[str, Exception], None]] = None, - tracer: Optional["Tracer"] = None, + allowed_imports: Optional[Iterable[str]] = None, numa_node: Optional[int] = None, cgroup_path=None, capabilities: Optional[dict[str, Any]] = None, - ): - super().__init__(name=name, daemon=True) - self._logger = logging.getLogger(f"pyisolate.{name}") - self._inbox: "queue.Queue[Any]" = queue.Queue() - self._outbox: "queue.Queue[Any]" = queue.Queue() - self._stop_event = threading.Event() + ) -> None: self.policy = policy self.cpu_quota_ms = cpu_ms self.mem_quota_bytes = mem_bytes @@ -434,28 +426,71 @@ def __init__( self.output_bytes_max = output_bytes_max self.child_work_max = child_work_max self.allowed_imports = self._merge_allowed_imports(policy, allowed_imports) + self.numa_node = numa_node + self._bound_numa_node = None + self._cgroup_path = cgroup_path + self._capabilities = deserialize_capabilities(capabilities) + + def _reset_runtime_state(self) -> None: self._cpu_time = 0.0 self._mem_peak = 0 - self.numa_node = numa_node - self._bound_numa_node: int | None = None self._mem_base = 0 - self._start_time: float | None = None - self._on_violation = on_violation - self._tracer = tracer or Tracer() + self._start_time = None self._ops = 0 self._errors = 0 self._latency = {"0.5": 0, "1": 0, "5": 0, "10": 0, "inf": 0} self._latency_sum = 0.0 - self._cgroup_path = cgroup_path self._trace_enabled = False self._syscall_log: list[str] = [] - self._capabilities = deserialize_capabilities(capabilities) - self._quarantine_reason: str | None = None - self.termination_reason: str | None = None + self._quarantine_reason = None + self.termination_reason = None self._open_files = 0 self._network_ops = 0 self._output_bytes = 0 self._child_work = 0 + + def __init__( + self, + name: str, + policy=None, + cpu_ms: Optional[int] = None, + mem_bytes: Optional[int] = None, + wall_time_ms: Optional[int] = None, + open_files_max: Optional[int] = None, + network_ops_max: Optional[int] = None, + output_bytes_max: Optional[int] = None, + child_work_max: Optional[int] = None, + allowed_imports: Optional[list[str]] = None, + on_violation: Optional[Callable[[str, Exception], None]] = None, + tracer: Optional["Tracer"] = None, + numa_node: Optional[int] = None, + cgroup_path=None, + capabilities: Optional[dict[str, Any]] = None, + ): + super().__init__(name=name, daemon=True) + self._logger = logging.getLogger(f"pyisolate.{name}") + self._inbox: "queue.Queue[Any]" = queue.Queue() + self._outbox: "queue.Queue[Any]" = queue.Queue() + self._stop_event = threading.Event() + self._on_violation = on_violation + self._tracer = tracer or Tracer() + self._init_config_wiring( + policy=policy, + cpu_ms=cpu_ms, + mem_bytes=mem_bytes, + wall_time_ms=wall_time_ms, + open_files_max=open_files_max, + network_ops_max=network_ops_max, + output_bytes_max=output_bytes_max, + child_work_max=child_work_max, + allowed_imports=allowed_imports, + numa_node=numa_node, + cgroup_path=cgroup_path, + capabilities=capabilities, + ) + self._reset_runtime_state() + # Dedup set spans sandbox lifetimes for this thread; message IDs are monotonic + # and intentionally preserved across reset() to avoid stale replay collisions. self._next_attach_msg_id = 1 self._seen_attach_msg_ids: set[int] = set() @@ -619,33 +654,21 @@ def reset( """Reuse this thread for a new sandbox.""" old_path = getattr(self, "_cgroup_path", None) self.name = name - self.policy = policy - self.cpu_quota_ms = cpu_ms - self.mem_quota_bytes = mem_bytes - self.wall_time_ms = wall_time_ms - self.open_files_max = open_files_max - self.network_ops_max = network_ops_max - self.output_bytes_max = output_bytes_max - self.child_work_max = child_work_max - self.numa_node = numa_node - self._bound_numa_node = None - self.allowed_imports = self._merge_allowed_imports(policy, allowed_imports) - self._cpu_time = 0.0 - self._mem_peak = 0 - self._ops = 0 - self._errors = 0 - self._latency = {"0.5": 0, "1": 0, "5": 0, "10": 0, "inf": 0} - self._latency_sum = 0.0 - self._trace_enabled = False - self._syscall_log = [] - self._start_time = None - self._cgroup_path = cgroup_path - self._capabilities = deserialize_capabilities(capabilities) - self.termination_reason = None - self._open_files = 0 - self._network_ops = 0 - self._output_bytes = 0 - self._child_work = 0 + self._init_config_wiring( + policy=policy, + cpu_ms=cpu_ms, + mem_bytes=mem_bytes, + wall_time_ms=wall_time_ms, + open_files_max=open_files_max, + network_ops_max=network_ops_max, + output_bytes_max=output_bytes_max, + child_work_max=child_work_max, + allowed_imports=allowed_imports, + numa_node=numa_node, + cgroup_path=cgroup_path, + capabilities=capabilities, + ) + self._reset_runtime_state() # Request the sandbox thread to (re)attach itself to the new cgroup. # The attachment must happen from the sandbox thread's context. msg_id = self._next_attach_msg_id