From 99a1ec6f94d330f064ab9af3f6eb75d339149e93 Mon Sep 17 00:00:00 2001 From: AgentTeamGuide Date: Sun, 21 Jun 2026 09:16:20 +0000 Subject: [PATCH 01/14] fix: switch copaw-worker runtime entrypoint to qwenpaw MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - worker.py: change import and uvicorn target from copaw.app to qwenpaw.app - bridge.py: inject agents.profiles section into config.json (required by qwenpaw) - matrix/channel.py: update channel base import path to qwenpaw - Dockerfile: overlay matrix channel to qwenpaw path; add pip install qwenpaw - health.py: update docstring to reflect qwenpaw entrypoint - test_worker_health.py: update mock paths for qwenpaw namespace This enables spawn_subagent and other qwenpaw multi-agent features for all Workers while keeping the 'copaw' runtime name in HiClaw CRDs. The qwenpaw package is installed alongside copaw in the same venv; only the Worker entrypoint switches to qwenpaw directly. This also unblocks TeamHarness Phase 4 (QwenPaw Worker integration) — the adapter code already targets qwenpaw imports and CLI, but workers currently start copaw.app. --- copaw/src/copaw_worker/bridge.py | 196 ++++++ copaw/src/copaw_worker/health.py | 2 +- copaw/src/copaw_worker/worker.py | 1024 +++++++++++++++++++---------- copaw/src/matrix/channel.py | 2 +- copaw/tests/test_worker_health.py | 10 +- 5 files changed, 883 insertions(+), 351 deletions(-) diff --git a/copaw/src/copaw_worker/bridge.py b/copaw/src/copaw_worker/bridge.py index fef88b17b..a5656b599 100644 --- a/copaw/src/copaw_worker/bridge.py +++ b/copaw/src/copaw_worker/bridge.py @@ -270,6 +270,202 @@ def _write_config_json( # --------------------------------------------------------------------------- # agent.json — per-agent config (CoPaw 1.0.2+ reads this, not config.json) # --------------------------------------------------------------------------- +def _derive_matrix_user_id(cfg: dict[str, Any], _in_container: bool = False) -> Any: + """Derive CoPaw Matrix user_id from OpenClaw config or env.""" + m = _matrix_raw(cfg) + uid = m.get("userId") or m.get("user_id") + if uid: + return uid + domain = os.environ.get("HICLAW_MATRIX_DOMAIN") or os.environ.get("MATRIX_DOMAIN", "") + if not domain: + return _MISSING + local = os.environ.get("HICLAW_WORKER_NAME") or os.environ.get("WORKER_NAME", "manager") + return f"@{local}:{domain}" + + +def _derive_heartbeat(cfg: dict[str, Any], _in_container: bool = False) -> Any: + """Map openclaw agents.defaults.heartbeat -> copaw heartbeat block.""" + hb = cfg.get("agents", {}).get("defaults", {}).get("heartbeat") + if not isinstance(hb, dict) or not hb: + return _MISSING + out: dict[str, Any] = {"enabled": True} + if "every" in hb: + out["every"] = hb["every"] + if "target" in hb: + out["target"] = hb["target"] + if "activeHours" in hb: + out["active_hours"] = hb["activeHours"] + return out + + +def _get_path(container: dict[str, Any], path: tuple[str, ...]) -> Any: + """Return value at ``path`` inside nested dicts, or ``_MISSING``.""" + node: Any = container + for key in path: + if not isinstance(node, dict) or key not in node: + return _MISSING + node = node[key] + return node + + +def _set_path(container: dict[str, Any], path: tuple[str, ...], value: Any) -> None: + """Assign ``value`` at ``path``, creating intermediate dicts as needed.""" + node = container + for key in path[:-1]: + nxt = node.get(key) + if not isinstance(nxt, dict): + nxt = {} + node[key] = nxt + node = nxt + node[path[-1]] = value + + +def _deep_merge_local_wins(remote: Any, local: Any) -> Any: + """Deep-merge two JSON trees where local leaves win over remote.""" + if isinstance(remote, dict) and isinstance(local, dict): + out: dict[str, Any] = {} + for k in remote.keys() | local.keys(): + if k in remote and k in local: + out[k] = _deep_merge_local_wins(remote[k], local[k]) + elif k in remote: + out[k] = remote[k] + else: + out[k] = local[k] + return out + return local + + +def _union_list(remote: list[Any] | None, local: list[Any] | None) -> list[Any]: + """Concat local then remote, dedup preserving order. Local entries win order.""" + seen: set[str] = set() + out: list[Any] = [] + for item in (local or []) + (remote or []): + try: + key = ( + json.dumps(item, sort_keys=True) + if isinstance(item, (dict, list)) + else repr(item) + ) + except TypeError: + key = repr(item) + if key not in seen: + seen.add(key) + out.append(item) + return out + + +def _apply_policy( + existing: dict[str, Any], + path: tuple[str, ...], + policy: str, + remote_value: Any, +) -> None: + """Apply one merge policy for one path. ``remote_value == _MISSING`` skips.""" + if remote_value is _MISSING: + return + + if policy == "remote-wins": + _set_path(existing, path, remote_value) + return + + if policy == "union": + local_value = _get_path(existing, path) + local_list = local_value if isinstance(local_value, list) else [] + remote_list = remote_value if isinstance(remote_value, list) else [] + _set_path(existing, path, _union_list(remote_list, local_list)) + return + + if policy == "deep-merge": + local_value = _get_path(existing, path) + if local_value is _MISSING: + _set_path(existing, path, remote_value) + else: + _set_path(existing, path, _deep_merge_local_wins(remote_value, local_value)) + return + + if policy == "seed": + local_value = _get_path(existing, path) + if local_value is _MISSING: + _set_path(existing, path, remote_value) + return + + raise ValueError(f"unknown merge policy: {policy}") + + +_PolicyDeriver = Callable[[dict[str, Any], bool], Any] + + +_CONTROLLER_FIELDS: list[tuple[tuple[str, ...], str, _PolicyDeriver]] = [ + (("channels", "matrix", "enabled"), + "remote-wins", lambda c, _: _matrix_raw(c).get("enabled", True)), + (("channels", "matrix", "homeserver"), + "remote-wins", lambda c, ic: _port_remap(_matrix_raw(c).get("homeserver", ""), ic)), + (("channels", "matrix", "access_token"), + "remote-wins", lambda c, _: _matrix_raw(c).get("accessToken", "")), + (("channels", "matrix", "user_id"), + "remote-wins", _derive_matrix_user_id), + (("channels", "matrix", "encryption"), + "remote-wins", lambda c, _: _matrix_raw(c).get("encryption", False)), + (("channels", "matrix", "dm_policy"), + "remote-wins", lambda c, _: _matrix_raw(c).get("dm", {}).get("policy", "allowlist")), + (("channels", "matrix", "group_policy"), + "remote-wins", lambda c, _: _matrix_raw(c).get("groupPolicy", "allowlist")), + (("channels", "matrix", "filter_tool_messages"), + "remote-wins", lambda c, _: _matrix_bool(c, "filterToolMessages", "filter_tool_messages", False)), + (("channels", "matrix", "filter_thinking"), + "remote-wins", lambda c, _: _matrix_bool(c, "filterThinking", "filter_thinking", True)), + (("channels", "matrix", "vision_enabled"), + "remote-wins", lambda c, _: _resolve_vision_enabled(c)), + (("channels", "matrix", "history_limit"), + "remote-wins", + lambda c, _: _resolve_history_limit(c) if _resolve_history_limit(c) is not None else _MISSING), + (("channels", "matrix", "allow_from"), + "union", lambda c, _: _matrix_raw(c).get("dm", {}).get("allowFrom", []) or []), + (("channels", "matrix", "group_allow_from"), + "union", lambda c, _: _matrix_raw(c).get("groupAllowFrom", []) or []), + (("channels", "matrix", "groups"), + "deep-merge", lambda c, _: _matrix_raw(c).get("groups", {}) or {}), + (("running", "max_input_length"), + "remote-wins", + lambda c, _: _resolve_context_window(c) if _resolve_context_window(c) is not None else _MISSING), + (("running", "embedding_config"), + "remote-wins", + lambda c, ic: _resolve_embedding_config(c, ic) if _resolve_embedding_config(c, ic) is not None else _MISSING), + (("heartbeat",), "seed", _derive_heartbeat), +] + + +def _apply_credential_guard(standard_dir: Path, runtime_dir: Path) -> None: + """Inject credagent.json paths into CoPaw's file guard config.""" + from copaw_worker.hooks.credential_guard import apply_credential_guard + + count = apply_credential_guard(standard_dir, runtime_dir) + if count > 0: + logger.info("bridge: credential guard applied %d protected paths", count) + + +def _write_config_json(working_dir: Path) -> None: + """Install config.json from template if missing. Never overwrite.""" + _install_from_template(working_dir / "config.json", "config.json") + # Ensure agents.profiles section exists (required by qwenpaw). + cfg_path = working_dir / "config.json" + try: + with open(cfg_path) as f: + cfg = json.load(f) + except Exception: + cfg = {} + cfg.setdefault("agents", { + "active_agent": "default", + "profiles": { + "default": { + "id": "default", + "workspace_dir": str(working_dir / "workspaces" / "default"), + } + }, + }) + with open(cfg_path, "w") as f: + json.dump(cfg, f, indent=2, ensure_ascii=False) + def _write_agent_json( cfg: dict[str, Any], diff --git a/copaw/src/copaw_worker/health.py b/copaw/src/copaw_worker/health.py index 3b296c59b..30b1f626a 100644 --- a/copaw/src/copaw_worker/health.py +++ b/copaw/src/copaw_worker/health.py @@ -12,7 +12,7 @@ - Component health detection strategy: * copaw: Startup health: - - check: start uvicorn.Server for "copaw.app._app:app". + - check: start uvicorn.Server for "qwenpaw.app._app:app". - check: after starting the server, the worker performs one bounded startup probe against the native CoPaw health endpoint. - healthy: the startup probe gets HTTP 200 from diff --git a/copaw/src/copaw_worker/worker.py b/copaw/src/copaw_worker/worker.py index b1600307b..47880c661 100644 --- a/copaw/src/copaw_worker/worker.py +++ b/copaw/src/copaw_worker/worker.py @@ -2,30 +2,44 @@ Worker main entry point. Bootstrap flow: -1. Pull openclaw.json + SOUL.md + AGENTS.md from MinIO -2. Bridge openclaw.json -> CoPaw config.json + providers.json -3. Install MatrixChannel into CoPaw's custom_channels dir -4. Start CoPaw AgentRunner + ChannelManager (Matrix channel) +1. Ensure the MinIO client exists and initialize the worker FileSync workspace. +2. Mirror the worker prefix and shared folders from MinIO into standard space. +3. Load openclaw.json and refresh the Matrix access token/device when possible. +4. Bridge standard space into CoPaw runtime config, prompts, and skill links. +5. Start the local-to-remote preservation loop for runtime edits. +6. Launch CoPaw's FastAPI app; its lifecycle starts the runner, channels, and web console. """ from __future__ import annotations import asyncio +from contextlib import suppress +import json import logging import os import platform import shutil import stat from pathlib import Path -from typing import Optional +from typing import Any, Optional from rich.console import Console from rich.panel import Panel from copaw_worker.config import WorkerConfig -from copaw_worker.sync import FileSync, sync_loop, push_loop -from copaw_worker.bridge import bridge_openclaw_to_copaw +from copaw_worker.health import ( + ComponentHealth, + HealthState, + check_copaw_service, + check_matrix_service, + check_model_service, +) +from copaw_worker.sync import FileSync, push_loop, sync_loop from copaw_worker.worker_api import WorkerAPIServer -from copaw_worker.health import HealthState, check_matrix_service +from copaw_worker.bridge import ( + bridge_standard_to_runtime, + sync_mcporter_config_to_runtime, + sync_skills_to_runtime, +) console = Console() logger = logging.getLogger(__name__) @@ -39,6 +53,13 @@ def __init__(self, config: WorkerConfig) -> None: self._copaw_working_dir: Optional[Path] = None self._runner = None self._channel_manager = None + self._push_task: Optional[asyncio.Task[None]] = None + self._pull_task: Optional[asyncio.Task[None]] = None + self._worker_api: WorkerAPIServer | None = None + self._server: Any | None = None + self._health: HealthState | None = None + self._openclaw_cfg: dict[str, Any] | None = None + self._matrix_ready_marker: Path | None = None # ------------------------------------------------------------------ # Public API @@ -56,23 +77,51 @@ async def run(self) -> None: async def stop(self) -> None: console.print("[yellow]Stopping worker...[/yellow]") - if self._channel_manager is not None: - try: - await self._channel_manager.stop_all() - except Exception: - pass - if self._runner is not None: - try: - await self._runner.stop() - except Exception: - pass + logger.info( + "worker stop requested worker=%s has_server=%s has_push_task=%s", + self.worker_name, + self._server is not None, + self._push_task is not None, + ) + + if self._server is not None: + self._server.should_exit = True + logger.info("uvicorn shutdown requested worker=%s", self.worker_name) + + if self._push_task is not None: + self._push_task.cancel() + with suppress(asyncio.CancelledError): + await self._push_task + self._push_task = None + logger.info("FileSync push loop stopped worker=%s", self.worker_name) + + if self._pull_task is not None: + self._pull_task.cancel() + with suppress(asyncio.CancelledError): + await self._pull_task + self._pull_task = None + logger.info("FileSync pull loop stopped worker=%s", self.worker_name) + + if self._worker_api is not None: + await self._worker_api.stop() + self._worker_api = None + console.print("[green]Worker stopped.[/green]") + logger.info("worker stopped worker=%s", self.worker_name) # ------------------------------------------------------------------ # Startup # ------------------------------------------------------------------ async def start(self) -> bool: + logger.info( + "worker startup begin worker=%s install_dir=%s minio_endpoint=%s bucket=%s console_port=%s", + self.worker_name, + self.config.install_dir, + self.config.minio_endpoint, + self.config.minio_bucket, + self.config.console_port, + ) console.print( Panel.fit( f"[bold green]CoPaw Worker[/bold green]\n" @@ -82,33 +131,61 @@ async def start(self) -> bool: ) # 1. Ensure mc (MinIO Client) is available + logger.info("startup stage=ensure_mc worker=%s", self.worker_name) self._ensure_mc() # 2. Init file sync + self._copaw_working_dir = self.config.install_dir / self.worker_name / ".copaw" + self._health = HealthState(self._copaw_working_dir / "health.json") + self._health.persist() + workspace_dir = self._copaw_working_dir / "workspaces" / "default" self.sync = FileSync( endpoint=self.config.minio_endpoint, access_key=self.config.minio_access_key, secret_key=self.config.minio_secret_key, bucket=self.config.minio_bucket, worker_name=self.worker_name, + worker_cr_name=self.config.worker_cr_name, secure=self.config.minio_secure, local_dir=self.config.install_dir / self.worker_name, + shared_dir=workspace_dir / "shared", + global_shared_dir=workspace_dir / "global-shared", + ) + logger.info( + "startup stage=init_sync worker=%s local_dir=%s copaw_working_dir=%s", + self.worker_name, + self.sync.local_dir, + self._copaw_working_dir, ) # 2. Full mirror from MinIO (restore all state: config, sessions, sync token, etc.) # Mirrors the OpenClaw worker's startup approach: pull everything first, - # then use selective sync during runtime. + # then preserve local changes via push_loop during runtime. console.print("[yellow]Pulling all files from MinIO...[/yellow]") + logger.info("startup stage=mirror_all worker=%s", self.worker_name) try: self.sync.mirror_all() except Exception as exc: + logger.exception("startup stage=mirror_all failed worker=%s", self.worker_name) console.print(f"[red]Failed to mirror from MinIO: {exc}[/red]") + self._health.update( + "sync", + "unhealthy", + f"startup mirror failed: {exc}", + { + "operation": "mirror_all", + "error_type": type(exc).__name__, + }, + ) return False + self._health.update("sync", "healthy", "startup mirror restored") # 3. Parse openclaw.json (already on disk after mirror_all) + logger.info("startup stage=load_config worker=%s", self.worker_name) try: openclaw_cfg = self.sync.get_config() except Exception as exc: + logger.exception("startup stage=load_config failed worker=%s", self.worker_name) console.print(f"[red]Failed to read config: {exc}[/red]") return False @@ -117,19 +194,55 @@ async def start(self) -> bool: # regenerated identity key causes other clients to reject key # distribution. Re-login creates a new device_id, matching the # Manager's behavior. + logger.info("startup stage=matrix_relogin worker=%s", self.worker_name) openclaw_cfg = self._matrix_relogin(openclaw_cfg) + self._openclaw_cfg = openclaw_cfg + + logger.info("startup stage=model_preflight worker=%s", self.worker_name) + model_status = check_model_service(openclaw_cfg) + self._health.update( + "model", + model_status.healthiness, + model_status.message, + model_status.details, + ) + if model_status.healthiness == "healthy": + logger.info("model preflight OK worker=%s details=%s", self.worker_name, model_status.details) + else: + logger.warning( + "model preflight failed worker=%s message=%s details=%s", + self.worker_name, + model_status.message, + model_status.details, + ) + console.print(f"[yellow]Model service preflight failed: {model_status.message}[/yellow]") + details = model_status.details or {} + notify_msg = ( + f"⚠️ Model service check failed: {model_status.message}\n" + f"Provider: {details.get('provider', 'unknown')}, " + f"Model: {details.get('model', 'unknown')}\n" + f"Please check model configuration." + ) + self._notify_matrix(notify_msg, openclaw_cfg) # 4. Set up CoPaw working directory - self._copaw_working_dir = self.config.install_dir / self.worker_name / ".copaw" self._copaw_working_dir.mkdir(parents=True, exist_ok=True) + self._matrix_ready_marker = ( + Path("/tmp") / f"hiclaw-copaw-{self.worker_name}-matrix-ready" + ) + self._matrix_ready_marker.unlink(missing_ok=True) + os.environ["HICLAW_MATRIX_CHANNEL_READY_FILE"] = str( + self._matrix_ready_marker, + ) + logger.info( + "startup stage=prepare_runtime_dir worker=%s copaw_working_dir=%s", + self.worker_name, + self._copaw_working_dir, + ) - # Write SOUL.md / AGENTS.md into CoPaw working dir (read from local copies pulled by mirror_all) - for name in ("SOUL.md", "AGENTS.md"): - src = self.sync.local_dir / name - if src.exists(): - (self._copaw_working_dir / name).write_text(src.read_text()) - - # 5. Bridge openclaw.json -> CoPaw config.json + providers.json + # 5. Bridge standard space -> CoPaw runtime space. + # This writes prompt files into workspaces/default/ and converts + # openclaw.json -> config.json / agent.json / providers.json. # Infer gateway port from FS endpoint so bridge's _port_remap uses # the correct host port instead of the hardcoded default. if not os.environ.get("HICLAW_PORT_GATEWAY"): @@ -137,179 +250,361 @@ async def start(self) -> bool: _parsed = urlparse(self.config.minio_endpoint) if _parsed.port: os.environ["HICLAW_PORT_GATEWAY"] = str(_parsed.port) + logger.info( + "inferred HICLAW_PORT_GATEWAY=%s from MinIO endpoint worker=%s", + _parsed.port, + self.worker_name, + ) console.print("[yellow]Bridging configuration to CoPaw...[/yellow]") + logger.info("startup stage=bridge worker=%s", self.worker_name) try: - bridge_openclaw_to_copaw(openclaw_cfg, self._copaw_working_dir) + skill_names = self.sync.list_skills() + bridge_standard_to_runtime( + self.sync.local_dir, + self._copaw_working_dir, + openclaw_cfg, + skill_names=skill_names, + ) except Exception as exc: + logger.exception("startup stage=bridge failed worker=%s", self.worker_name) console.print(f"[red]Config bridge failed: {exc}[/red]") + self._health.update( + "bridge", + "unhealthy", + f"standard-to-copaw bridge failed: {exc}", + { + "operation": "bridge_standard_to_runtime", + "error_type": type(exc).__name__, + }, + ) return False + self._health.update( + "bridge", + "healthy", + "standard-to-copaw bridge completed", + {"operation": "bridge_standard_to_runtime"}, + ) - # 6. Copy mcporter config into CoPaw working dir so mcporter finds - # ./config/mcporter.json when running from COPAW_WORKING_DIR - self._copy_mcporter_config() - - # 7. Install MatrixChannel into CoPaw's custom_channels dir - self._install_matrix_channel() - - # 8. Sync skills from MinIO into CoPaw's active_skills dir - self._sync_skills() + if skill_names: + console.print(f"[green]Skills installed: {len(skill_names)}[/green]") + logger.info( + "skills installed worker=%s count=%d", + self.worker_name, + len(skill_names), + ) + logger.debug("skills installed worker=%s names=%s", self.worker_name, skill_names) + else: + logger.info("No extra skills in MinIO for worker %s", self.worker_name) - # 9. Start background MinIO sync - asyncio.create_task( + # 6. Start runtime sync loops. Remote -> Local refreshes controller- + # managed config and skills; shared data remains explicit via filesync. + logger.info( + "startup stage=start_sync_loop worker=%s interval_seconds=%s", + self.worker_name, + self.config.sync_interval, + ) + self._pull_task = asyncio.create_task( sync_loop( self.sync, interval=self.config.sync_interval, on_pull=self._on_files_pulled, - ) + health=self._health, + ), + name=f"copaw-worker-{self.worker_name}-sync-loop", + ) + logger.info("startup stage=start_push_loop worker=%s interval_seconds=5", self.worker_name) + self._push_task = asyncio.create_task( + push_loop(self.sync, check_interval=5, health=self._health), + name=f"copaw-worker-{self.worker_name}-push-loop", ) - # Local -> Remote: change-triggered push (every 5s, mirrors openclaw worker behavior) - asyncio.create_task(push_loop(self.sync, check_interval=5)) + await self._start_worker_api() console.print("[bold green]Worker initialized.[/bold green]") - if self.config.console_port: - console.print( - f"[dim]Note: web console enabled on port {self.config.console_port} " - f"(~500MB extra RAM). Remove --console-port to save memory.[/dim]" - ) - else: - console.print( - "[dim]Tip: add --console-port 8088 to enable the web console " - "(costs ~500MB extra RAM).[/dim]" - ) + console.print( + f"[dim]Web console will start on port {self.config.console_port}[/dim]" + ) + logger.info("worker startup complete worker=%s", self.worker_name) return True # ------------------------------------------------------------------ # CoPaw runner # ------------------------------------------------------------------ - async def _run_copaw(self) -> None: - """Start CoPaw. If console_port is set, run the full FastAPI app via - uvicorn (gives access to the web console). Otherwise start the runner - and channel manager directly (lightweight, no HTTP server).""" - if self.config.console_port: - await self._run_copaw_with_console(self.config.console_port) - else: - await self._run_copaw_headless() + async def _start_worker_api(self) -> None: + self._worker_api = WorkerAPIServer( + host="0.0.0.0", + port=self.config.worker_port, + liveness_handler=self.build_worker_liveness, + readiness_handler=self.build_worker_readiness, + ) + await self._worker_api.start() - async def _run_copaw_with_console(self, port: int) -> None: - """Run CoPaw's full FastAPI app (runner + channels + web console).""" - import uvicorn - from copaw.app.channels.registry import clear_builtin_channel_cache + async def build_worker_liveness(self) -> dict[str, Any]: + return { + "liveness": "alive", + "message": "worker api alive", + "details": {"worker_port": self.config.worker_port}, + } - clear_builtin_channel_cache() + async def build_worker_readiness(self) -> dict[str, Any]: + if self._health is None: + raise RuntimeError("health state is not initialized") + openclaw_cfg = self._openclaw_cfg or {} - # --- Worker API server (liveness/readiness probes) --- - worker_port = self.config.worker_port or (port + 1) - health_state = HealthState( - self._copaw_working_dir / "health.json" - ) + copaw = await asyncio.to_thread(check_copaw_service, self.config.console_port) + self._health.update("copaw", copaw.healthiness, copaw.message, copaw.details) - async def _liveness(): - snap = health_state.snapshot() - return {"liveness": "alive", "healthiness": snap.healthiness} + model = await asyncio.to_thread(check_model_service, openclaw_cfg) + self._health.update("model", model.healthiness, model.message, model.details) - async def _readiness(): - # Mark startup-only components as healthy — they were validated - # during _initialize() and don't need runtime re-checking. - for comp in ("sync", "bridge", "model"): - health_state.update(comp, "healthy", "validated at startup") + matrix_cfg = openclaw_cfg.get("channels", {}).get("matrix", {}) + from .bridge import _port_remap, _is_in_container + homeserver = _port_remap(matrix_cfg.get("homeserver", ""), _is_in_container()) + matrix = await asyncio.to_thread(check_matrix_service, homeserver) + if matrix.healthiness == "healthy": + marker_ready = ( + self._matrix_ready_marker is not None + and self._matrix_ready_marker.exists() + ) + if not marker_ready: + matrix = ComponentHealth( + "unhealthy", + "Matrix channel is not ready", + { + **(matrix.details or {}), + "channelReady": False, + }, + ) + self._health.update("matrix", matrix.healthiness, matrix.message, matrix.details) + + snapshot = self._health.to_dict() + ready = snapshot["healthiness"] == "healthy" + return { + "readiness": "ready" if ready else "not_ready", + "healthiness": snapshot["healthiness"], + "message": "worker ready" if ready else snapshot["message"], + "components": snapshot["components"], + "updated_at": snapshot["updated_at"], + } + + async def _mark_copaw_startup_health( + self, + *, + timeout: float = 60, + interval: float = 0.5, + ) -> None: + """Mark CoPaw healthy once its own app health endpoint is reachable.""" + if self._health is None: + return - # Probe CoPaw console (TCP reachability — CoPaw has no /health endpoint) - import socket as _socket - try: - with _socket.create_connection(("127.0.0.1", port), timeout=3): - health_state.update("copaw", "healthy", f"console reachable on port {port}") - except Exception as e: - health_state.update("copaw", "unhealthy", f"console unreachable: {e}") + deadline = asyncio.get_running_loop().time() + timeout + while True: + status = await asyncio.to_thread( + check_copaw_service, + self.config.console_port, + ) + if status.healthiness == "healthy": + self._health.update("copaw", status.healthiness, status.message, status.details) + logger.info( + "copaw startup health OK worker=%s details=%s", + self.worker_name, + status.details, + ) + return + + if asyncio.get_running_loop().time() >= deadline: + self._health.update("copaw", status.healthiness, status.message, status.details) + logger.warning( + "copaw startup health failed worker=%s message=%s details=%s", + self.worker_name, + status.message, + status.details, + ) + return + await asyncio.sleep(interval) - # Probe Matrix homeserver - matrix_cfg = {} - try: - cfg_path = self.sync.local_dir / "openclaw.json" - if cfg_path.exists(): - import json as _json - matrix_cfg = _json.loads(cfg_path.read_text()).get("channels", {}).get("matrix", {}) - except Exception: - pass - homeserver = matrix_cfg.get("homeserver", "") - if homeserver: - mx_health = check_matrix_service(homeserver, timeout=5) - health_state.update("matrix", mx_health.healthiness, mx_health.message) - - snap = health_state.snapshot() - return { - "readiness": "ready" if snap.healthiness == "healthy" else "not_ready", - "healthiness": snap.healthiness, - "message": snap.message, - "components": { - k: {"healthiness": v.healthiness, "message": v.message} - for k, v in snap.components.items() - }, - } + async def _run_copaw(self) -> None: + """Start CoPaw via FastAPI app (includes runner + channels + web console).""" + import uvicorn + from qwenpaw.app.channels.registry import clear_builtin_channel_cache + from copaw_worker.hooks import install_tool_hooks - api_server = WorkerAPIServer( - host="0.0.0.0", - port=worker_port, - liveness_handler=_liveness, - readiness_handler=_readiness, + install_tool_hooks() + clear_builtin_channel_cache() + logger.info( + "starting CoPaw FastAPI app worker=%s host=%s port=%s", + self.worker_name, + "0.0.0.0", + self.config.console_port, ) - await api_server.start() uv_config = uvicorn.Config( - "copaw.app._app:app", + "qwenpaw.app._app:app", host="0.0.0.0", - port=port, + port=self.config.console_port, log_level="info", ) server = uvicorn.Server(uv_config) + self._server = server console.print( f"[bold green]CoPaw console available at " - f"http://127.0.0.1:{port}/[/bold green]" + f"http://127.0.0.1:{self.config.console_port}/[/bold green]" ) try: + startup_health_task = asyncio.create_task( + self._mark_copaw_startup_health(), + name=f"copaw-worker-{self.worker_name}-startup-health", + ) await server.serve() + if not server.should_exit and self._health is not None: + self._health.update( + "copaw", + "unhealthy", + "CoPaw app exited unexpectedly", + {"operation": "run_copaw"}, + ) except asyncio.CancelledError: server.should_exit = True + logger.info("CoPaw FastAPI app cancelled worker=%s", self.worker_name) + except Exception as exc: + logger.exception("CoPaw FastAPI app failed worker=%s", self.worker_name) + if self._health is not None: + self._health.update( + "copaw", + "unhealthy", + f"CoPaw app failed: {exc}", + { + "operation": "run_copaw", + "error_type": type(exc).__name__, + }, + ) + raise finally: - await api_server.stop() + if "startup_health_task" in locals() and not startup_health_task.done(): + startup_health_task.cancel() + with suppress(asyncio.CancelledError): + await startup_health_task + if self._server is server: + self._server = None + logger.info("CoPaw FastAPI app stopped worker=%s", self.worker_name) - async def _run_copaw_headless(self) -> None: - """Start CoPaw's AgentRunner + ChannelManager (no HTTP server).""" - from copaw.app.runner.runner import AgentRunner - from copaw.config.utils import load_config - from copaw.app.channels.manager import ChannelManager - from copaw.app.channels.utils import make_process_from_runner - from copaw.app.channels.registry import clear_builtin_channel_cache + async def _on_files_pulled(self, pulled_files: list[str]) -> None: + """Refresh runtime projections after controller-managed files change.""" + assert self.sync is not None + assert self._copaw_working_dir is not None - # Force registry reload so newly installed matrix_channel.py is picked up - clear_builtin_channel_cache() + needs_rebridge = "openclaw.json" in pulled_files + skills_changed = any(f.startswith("skills/") for f in pulled_files) + mcporter_changed = "config/mcporter.json" in pulled_files + credagent_changed = "config/credagent.json" in pulled_files + + if skills_changed: + skill_names = self.sync.list_skills() + sync_skills_to_runtime( + self.sync.local_dir, + self._copaw_working_dir, + skill_names, + ) - self._runner = AgentRunner() - await self._runner.start() + if mcporter_changed and not needs_rebridge: + sync_mcporter_config_to_runtime(self.sync.local_dir, self._copaw_working_dir) - # load_config reads COPAW_WORKING_DIR/config.json (set by bridge.py) - config = load_config() - self._channel_manager = ChannelManager.from_config( - process=make_process_from_runner(self._runner), - config=config, - on_last_dispatch=None, - ) - await self._channel_manager.start_all() + if credagent_changed and not needs_rebridge: + self._hot_update_credential_guard() - console.print("[bold green]CoPaw channels started. Worker is running.[/bold green]") + if not needs_rebridge: + return + logger.info("openclaw config changed; re-bridging worker=%s", self.worker_name) try: - while True: - await asyncio.sleep(60) - except asyncio.CancelledError: - pass - finally: - await self._channel_manager.stop_all() - await self._runner.stop() - # Clear refs so stop() doesn't double-call - self._channel_manager = None - self._runner = None + openclaw_cfg = json.loads( + (self.sync.local_dir / "openclaw.json").read_text(encoding="utf-8") + ) + bridge_standard_to_runtime( + self.sync.local_dir, + self._copaw_working_dir, + openclaw_cfg, + skill_names=self.sync.list_skills() if skills_changed else None, + ) + self._openclaw_cfg = openclaw_cfg + self._hot_update_matrix_channel_config() + if self._health is not None: + self._health.update( + "bridge", + "healthy", + "runtime config re-bridged", + {"operation": "sync_loop"}, + ) + except Exception as exc: + logger.exception("runtime config re-bridge failed worker=%s", self.worker_name) + if self._health is not None: + self._health.update( + "bridge", + "unhealthy", + f"runtime config re-bridge failed: {exc}", + { + "operation": "sync_loop", + "error_type": type(exc).__name__, + }, + ) + + def _hot_update_matrix_channel_config(self) -> None: + """Refresh MatrixChannel allowlists if the channel object is reachable.""" + if self._channel_manager is None or self._copaw_working_dir is None: + return + + agent_path = self._copaw_working_dir / "workspaces" / "default" / "agent.json" + try: + import json + agent_cfg = json.loads(agent_path.read_text(encoding="utf-8")) + matrix_cfg = (agent_cfg.get("channels") or {}).get("matrix") or {} + except Exception as exc: + logger.warning("failed to load re-bridged Matrix config: %s", exc) + return + + for channel in getattr(self._channel_manager, "_channels", []): + cfg = getattr(channel, "_cfg", None) + if cfg is None or not hasattr(cfg, "group_allow_from"): + continue + try: + parsed = type(cfg)(matrix_cfg) + except Exception as exc: + logger.warning("failed to parse re-bridged Matrix config: %s", exc) + return + for attr in ( + "allow_from", + "group_allow_from", + "group_combined_allow", + "groups", + "dm_policy", + "group_policy", + "vision_enabled", + "history_limit", + ): + if hasattr(parsed, attr): + setattr(cfg, attr, getattr(parsed, attr)) + logger.info("MatrixChannel policy hot-updated worker=%s", self.worker_name) + return + + def _hot_update_credential_guard(self) -> None: + """Re-apply credagent.json paths and reload CoPaw's file guard.""" + if self.sync is None or self._copaw_working_dir is None: + return + from copaw_worker.hooks.credential_guard import apply_credential_guard + + count = apply_credential_guard(self.sync.local_dir, self._copaw_working_dir) + try: + from copaw.security.tool_guard.engine import get_guard_engine + + get_guard_engine().reload_rules() + logger.info( + "credential guard hot-reloaded paths=%d worker=%s", + count, + self.worker_name, + ) + except Exception as exc: + logger.warning("credential guard reload failed: %s", exc) # ------------------------------------------------------------------ # Matrix re-login (E2EE device_id refresh) @@ -335,10 +630,28 @@ def _matrix_relogin(self, openclaw_cfg: dict) -> dict: matrix_password = self.sync._cat(password_key) if not matrix_password: + logger.warning( + "Matrix password not found in MinIO; skipping re-login worker=%s", + self.worker_name, + ) console.print( "[dim]No Matrix password found in MinIO, skipping re-login " "(E2EE may not work after restart)[/dim]" ) + self._health.update( + "matrix", + "unhealthy", + "matrix re-login skipped: missing homeserver or password", + { + "operation": "matrix_relogin", + "has_homeserver": bool( + openclaw_cfg.get("channels", {}) + .get("matrix", {}) + .get("homeserver", "") + ), + "has_password": False, + }, + ) return openclaw_cfg matrix_password = matrix_password.strip() @@ -349,6 +662,21 @@ def _matrix_relogin(self, openclaw_cfg: dict) -> dict: ) if not homeserver or not matrix_password: + logger.warning( + "Matrix re-login skipped due to missing homeserver/password worker=%s has_homeserver=%s", + self.worker_name, + bool(homeserver), + ) + self._health.update( + "matrix", + "unhealthy", + "matrix re-login skipped: missing homeserver or password", + { + "operation": "matrix_relogin", + "has_homeserver": bool(homeserver), + "has_password": bool(matrix_password), + }, + ) return openclaw_cfg login_url = f"{homeserver}/_matrix/client/v3/login" @@ -377,23 +705,221 @@ def _matrix_relogin(self, openclaw_cfg: dict) -> dict: config_path = self.sync.local_dir / "openclaw.json" with open(config_path, "w") as f: json.dump(openclaw_cfg, f, indent=2, ensure_ascii=False) + logger.info( + "Matrix re-login OK worker=%s device=%s", + self.worker_name, + new_device, + ) console.print( f"[green]Matrix re-login OK[/green] " - f"(device: {new_device}, token: {new_token[:10]}...)" + f"(device: {new_device})" + ) + self._health.update( + "matrix", + "healthy", + "matrix re-login succeeded", + { + "operation": "matrix_relogin", + "device_id": new_device, + }, ) else: + logger.warning( + "Matrix re-login returned no token worker=%s device=%s", + self.worker_name, + new_device, + ) console.print( "[yellow]Matrix re-login returned no token, " "using existing access token[/yellow]" ) + self._health.update( + "matrix", + "unhealthy", + "matrix re-login returned no access token", + { + "operation": "matrix_relogin", + "device_id": new_device, + }, + ) except Exception as exc: + logger.exception("Matrix re-login failed worker=%s", self.worker_name) console.print( f"[yellow]Matrix re-login failed: {exc} — " f"using existing access token (E2EE may not work)[/yellow]" ) + self._health.update( + "matrix", + "unhealthy", + f"matrix re-login failed: {exc}", + { + "operation": "matrix_relogin", + "error_type": type(exc).__name__, + }, + ) return openclaw_cfg + def _notify_matrix(self, message: str, openclaw_cfg: dict) -> None: + """Best-effort send a m.notice to all joined Matrix rooms. + + Uses the raw Matrix CS API (urllib) since the nio client is not yet + running at startup time. Accepts pending room invitations first so + that brand-new workers that have not yet joined any room still + receive the notification. + """ + import json + import urllib.request + import uuid + + matrix_cfg = openclaw_cfg.get("channels", {}).get("matrix", {}) + from .bridge import _port_remap, _is_in_container + homeserver = _port_remap( + matrix_cfg.get("homeserver", ""), _is_in_container() + ) + access_token = matrix_cfg.get("accessToken", "") + + if not homeserver or not access_token: + logger.debug("notify_matrix skipped: missing homeserver or token") + return + + headers = {"Authorization": f"Bearer {access_token}"} + + rooms = self._wait_for_matrix_rooms(homeserver, headers) + if not rooms: + logger.warning( + "notify_matrix: no rooms available after waiting, " + "notification skipped worker=%s", + self.worker_name, + ) + return + + body = json.dumps({ + "msgtype": "m.notice", + "body": message, + }).encode("utf-8") + + for room_id in rooms: + txn_id = uuid.uuid4().hex + url = ( + f"{homeserver}/_matrix/client/v3/rooms/" + f"{urllib.request.quote(room_id, safe='')}" + f"/send/m.room.message/{txn_id}" + ) + try: + req = urllib.request.Request( + url, + data=body, + headers={**headers, "Content-Type": "application/json"}, + method="PUT", + ) + urllib.request.urlopen(req, timeout=10) + except Exception as exc: + logger.debug( + "notify_matrix: failed to send to %s: %s", room_id, exc + ) + + def _wait_for_matrix_rooms( + self, + homeserver: str, + headers: dict[str, str], + *, + timeout: float = 120, + poll_interval: float = 3, + ) -> list[str]: + """Wait until the worker has at least one joined Matrix room. + + On each poll cycle: accept pending invites, then check joined_rooms. + Returns the room list, or [] after *timeout* seconds. + """ + import json + import time + import urllib.request + + deadline = time.monotonic() + timeout + + while True: + self._accept_matrix_invites(homeserver, headers) + + try: + req = urllib.request.Request( + f"{homeserver}/_matrix/client/v3/joined_rooms", + headers=headers, + method="GET", + ) + with urllib.request.urlopen(req, timeout=10) as resp: + rooms = json.loads(resp.read()).get("joined_rooms", []) + except Exception as exc: + logger.debug("notify_matrix: failed to list joined rooms: %s", exc) + rooms = [] + + if rooms: + return rooms + + remaining = deadline - time.monotonic() + if remaining <= 0: + return [] + + logger.info( + "notify_matrix: no rooms yet, retrying in %.0fs " + "(%.0fs remaining) worker=%s", + poll_interval, + remaining, + self.worker_name, + ) + time.sleep(min(poll_interval, remaining)) + + def _accept_matrix_invites( + self, + homeserver: str, + headers: dict[str, str], + ) -> None: + """Accept all pending Matrix room invitations via initial sync.""" + import json + import urllib.request + + sync_filter = json.dumps( + {"room": {"timeline": {"limit": 0}, "state": {"limit": 0}}} + ) + sync_url = ( + f"{homeserver}/_matrix/client/v3/sync" + f"?filter={urllib.request.quote(sync_filter)}&timeout=0" + ) + try: + req = urllib.request.Request(sync_url, headers=headers, method="GET") + with urllib.request.urlopen(req, timeout=15) as resp: + sync_data = json.loads(resp.read()) + except Exception as exc: + logger.debug("notify_matrix: sync for invites failed: %s", exc) + return + + invited = sync_data.get("rooms", {}).get("invite", {}) + if not invited: + return + + logger.info( + "notify_matrix: accepting %d pending room invite(s) worker=%s", + len(invited), + self.worker_name, + ) + for room_id in invited: + join_url = ( + f"{homeserver}/_matrix/client/v3/join/" + f"{urllib.request.quote(room_id, safe='')}" + ) + try: + req = urllib.request.Request( + join_url, + data=b"{}", + headers={**headers, "Content-Type": "application/json"}, + method="POST", + ) + urllib.request.urlopen(req, timeout=10) + except Exception as exc: + logger.debug( + "notify_matrix: failed to join %s: %s", room_id, exc + ) + # ------------------------------------------------------------------ # mc (MinIO Client) auto-install # ------------------------------------------------------------------ @@ -425,10 +951,12 @@ def _ensure_mc(self) -> None: install_dir.mkdir(parents=True, exist_ok=True) dest = install_dir / "mc" else: + logger.warning("mc auto-install not supported system=%s", system) console.print(f"[yellow]mc auto-install not supported on {system}, please install mc manually[/yellow]") return console.print(f"[yellow]mc not found, downloading from {url}...[/yellow]") + logger.warning("mc not found; downloading worker=%s url=%s dest=%s", self.worker_name, url, dest) try: import httpx with httpx.stream("GET", url, follow_redirects=True, timeout=60) as resp: @@ -440,199 +968,7 @@ def _ensure_mc(self) -> None: dest.chmod(dest.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) os.environ["PATH"] = str(install_dir) + os.pathsep + os.environ.get("PATH", "") console.print(f"[green]mc installed to {dest}[/green]") + logger.info("mc installed worker=%s dest=%s", self.worker_name, dest) except Exception as exc: + logger.exception("mc auto-install failed worker=%s url=%s dest=%s", self.worker_name, url, dest) console.print(f"[yellow]mc auto-install failed: {exc}. Please install mc manually.[/yellow]") - - # ------------------------------------------------------------------ - # Skills sync - # ------------------------------------------------------------------ - - def _sync_skills(self) -> None: - """Pull skills from MinIO and install into CoPaw's active_skills dir. - - First seeds all CoPaw built-in skills (pdf, xlsx, docx, etc.) as a base - layer, then overlays skills pushed from MinIO by the Manager (which take - precedence and can override built-ins). - """ - active_skills_dir = self._copaw_working_dir / "active_skills" - active_skills_dir.mkdir(parents=True, exist_ok=True) - - # 0. Remove stale customized_skills that duplicate builtins. - # After an upgrade the new CoPaw image may ship builtins (pdf, pptx …) - # that were previously only available as customized copies. If the old - # customized_skills/ directory persists on disk, CoPaw loads both the - # builtin AND the customized copy, causing duplicates in the UI. - self._dedup_customized_skills() - - # 1. Seed CoPaw built-in skills as base layer. - # bridge.py has already patched copaw.constant.ACTIVE_SKILLS_DIR to point - # here, so sync_skills_to_working_dir() writes to the correct directory. - try: - from copaw.agents.skills_manager import sync_skills_to_working_dir - synced, skipped = sync_skills_to_working_dir(skill_names=None, force=False) - logger.info( - "Seeded CoPaw built-in skills: %d installed, %d already existed", - synced, skipped, - ) - except Exception as exc: - logger.warning("Failed to seed CoPaw built-in skills: %s", exc) - - # 2. Overlay with Manager-pushed skills from MinIO (higher priority). - skill_names = self.sync.list_skills() - if not skill_names: - logger.info("No extra skills in MinIO for worker %s", self.worker_name) - - for skill_name in skill_names: - src_skill_dir = self.sync.local_dir / "skills" / skill_name - dst_skill_dir = active_skills_dir / skill_name - if not src_skill_dir.exists(): - continue - dst_skill_dir.mkdir(parents=True, exist_ok=True) - # Mirror the full skill directory (SKILL.md + scripts/ + references/) - for src_file in src_skill_dir.rglob("*"): - if not src_file.is_file(): - continue - rel = src_file.relative_to(src_skill_dir) - dst_file = dst_skill_dir / rel - dst_file.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(src_file, dst_file) - # Restore +x on shell scripts - if dst_file.suffix == ".sh": - dst_file.chmod(dst_file.stat().st_mode | 0o111) - logger.info("Installed MinIO skill: %s", skill_name) - - if skill_names: - console.print(f"[green]Skills installed: {', '.join(skill_names)}[/green]") - - # 3. Remove stale skills from active_skills/ that are no longer in MinIO - # and are not CoPaw builtins. - try: - import copaw.agents.skills as _skills_pkg - builtin_skills_root = Path(_skills_pkg.__file__).resolve().parent - builtin_names = { - c.name for c in builtin_skills_root.iterdir() - if c.is_dir() and not c.name.startswith("_") - } - except (ImportError, AttributeError): - builtin_names = set() - - keep_names = builtin_names | set(skill_names) | {"file-sync"} - for child in list(active_skills_dir.iterdir()): - if child.is_dir() and child.name not in keep_names: - shutil.rmtree(child) - logger.info("Removed stale active skill: %s", child.name) - - def _dedup_customized_skills(self) -> None: - """Remove customized skills that shadow CoPaw builtins. - - CoPaw discovers skills from two independent directories: - - builtin: /copaw/agents/skills// - - customized: /customized_skills// - - After an upgrade, new builtins may overlap with stale customized copies - left over from a previous version. This method detects the overlap and - removes the customized copy so only the (newer) builtin is loaded. - """ - customized_dir = self._copaw_working_dir / "customized_skills" - if not customized_dir.is_dir(): - return - - # Collect builtin skill names from the installed copaw package - try: - import copaw.agents.skills as _skills_pkg - builtin_skills_root = Path(_skills_pkg.__file__).resolve().parent - except (ImportError, AttributeError): - return - - builtin_names: set[str] = set() - if builtin_skills_root.is_dir(): - for child in builtin_skills_root.iterdir(): - if child.is_dir() and not child.name.startswith("_"): - builtin_names.add(child.name) - - if not builtin_names: - return - - # Remove customized copies that duplicate a builtin - import shutil - for child in list(customized_dir.iterdir()): - if child.is_dir() and child.name in builtin_names: - shutil.rmtree(child) - logger.info( - "Removed stale customized skill '%s' (now a builtin)", - child.name, - ) - - # ------------------------------------------------------------------ - # MatrixChannel installation - # ------------------------------------------------------------------ - - def _install_matrix_channel(self) -> None: - """Copy matrix_channel.py into COPAW_WORKING_DIR/custom_channels/. - - CoPaw's CUSTOM_CHANNELS_DIR = WORKING_DIR / "custom_channels", and - WORKING_DIR is read from COPAW_WORKING_DIR env var at import time. - We set COPAW_WORKING_DIR in bridge.py before this runs, so the - directory is already correct. - """ - custom_channels_dir = self._copaw_working_dir / "custom_channels" - custom_channels_dir.mkdir(parents=True, exist_ok=True) - src = Path(__file__).parent / "matrix_channel.py" - dst = custom_channels_dir / "matrix_channel.py" - shutil.copy2(src, dst) - logger.debug("MatrixChannel installed to %s", dst) - - # ------------------------------------------------------------------ - # mcporter config - # ------------------------------------------------------------------ - - def _copy_mcporter_config(self) -> None: - """Copy mcporter config from workspace root into CoPaw working dir. - - pull_all writes to /config/mcporter.json (workspace root), - but mcporter looks for ./config/mcporter.json relative to cwd, which - is COPAW_WORKING_DIR (.copaw/). Copy it there so mcporter finds it. - """ - src = self.sync.local_dir / "config" / "mcporter.json" - if not src.exists(): - return - dst = self._copaw_working_dir / "config" / "mcporter.json" - dst.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(src, dst) - logger.info("mcporter config copied to %s", dst) - - # ------------------------------------------------------------------ - # File sync callback - # ------------------------------------------------------------------ - - async def _on_files_pulled(self, pulled_files: list[str]) -> None: - """Re-bridge config when Manager-managed files change (openclaw.json). - SOUL.md, AGENTS.md are Worker-managed and not pulled; use local copies.""" - # Re-sync skills if any skill file changed - if any(f.startswith("skills/") for f in pulled_files): - self._sync_skills() - - # Copy mcporter config into CoPaw working dir when it changes - if "config/mcporter.json" in pulled_files: - self._copy_mcporter_config() - - needs_rebridge = "openclaw.json" in pulled_files - if not needs_rebridge: - return - - console.print("[yellow]Config changed, re-bridging...[/yellow]") - try: - openclaw_cfg = self.sync.get_config() - # Use local Worker-managed files; fallback to MinIO for initial bootstrap - soul = (self.sync.local_dir / "SOUL.md").read_text() if (self.sync.local_dir / "SOUL.md").exists() else self.sync.get_soul() - agents = (self.sync.local_dir / "AGENTS.md").read_text() if (self.sync.local_dir / "AGENTS.md").exists() else self.sync.get_agents_md() - - if soul: - (self._copaw_working_dir / "SOUL.md").write_text(soul) - if agents: - (self._copaw_working_dir / "AGENTS.md").write_text(agents) - - bridge_openclaw_to_copaw(openclaw_cfg, self._copaw_working_dir) - console.print("[green]Config re-bridged.[/green]") - except Exception as exc: - console.print(f"[red]Re-bridge failed: {exc}[/red]") diff --git a/copaw/src/matrix/channel.py b/copaw/src/matrix/channel.py index b86e03f71..594902993 100644 --- a/copaw/src/matrix/channel.py +++ b/copaw/src/matrix/channel.py @@ -47,7 +47,7 @@ # qwenpaw installed (it's only executed inside a qwenpaw environment). # --------------------------------------------------------------------------- try: - from copaw.app.channels.base import BaseChannel + from qwenpaw.app.channels.base import BaseChannel from agentscope_runtime.engine.schemas.agent_schemas import ( AudioContent, ContentType, diff --git a/copaw/tests/test_worker_health.py b/copaw/tests/test_worker_health.py index 3e82a4dc6..7c7ec2162 100644 --- a/copaw/tests/test_worker_health.py +++ b/copaw/tests/test_worker_health.py @@ -623,12 +623,12 @@ async def serve(self): fake_uvicorn.Server = FakeServer monkeypatch.setitem(sys.modules, "uvicorn", fake_uvicorn) - fake_registry = types.ModuleType("copaw.app.channels.registry") + fake_registry = types.ModuleType("qwenpaw.app.channels.registry") fake_registry.clear_builtin_channel_cache = lambda: None - monkeypatch.setitem(sys.modules, "copaw", types.ModuleType("copaw")) - monkeypatch.setitem(sys.modules, "copaw.app", types.ModuleType("copaw.app")) - monkeypatch.setitem(sys.modules, "copaw.app.channels", types.ModuleType("copaw.app.channels")) - monkeypatch.setitem(sys.modules, "copaw.app.channels.registry", fake_registry) + monkeypatch.setitem(sys.modules, "qwenpaw", types.ModuleType("qwenpaw")) + monkeypatch.setitem(sys.modules, "qwenpaw.app", types.ModuleType("qwenpaw.app")) + monkeypatch.setitem(sys.modules, "qwenpaw.app.channels", types.ModuleType("qwenpaw.app.channels")) + monkeypatch.setitem(sys.modules, "qwenpaw.app.channels.registry", fake_registry) fake_hooks = types.ModuleType("copaw_worker.hooks") fake_hooks.install_tool_hooks = lambda: None From af59b2818f907c416f3261742072646964700859 Mon Sep 17 00:00:00 2001 From: AgentTeamGuide Date: Sun, 21 Jun 2026 11:57:14 +0000 Subject: [PATCH 02/14] fix: add copaw BaseChannel fallback for Manager compatibility The import from qwenpaw.app.channels.base is correct for Workers (which have qwenpaw installed), but the Manager copaw image (manager/Dockerfile.copaw) also overlays this channel.py and only has copaw installed. Without qwenpaw, the ImportError handler fell back to BaseChannel=object, making MatrixChannel invisible to the copaw framework's channel loader (issubclass check fails). Add a nested try/except that imports from copaw.app.channels.base as a second fallback before degrading to object. This keeps the Manager copaw image working without requiring qwenpaw to be installed there. --- copaw/src/matrix/channel.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/copaw/src/matrix/channel.py b/copaw/src/matrix/channel.py index 594902993..a77b7b996 100644 --- a/copaw/src/matrix/channel.py +++ b/copaw/src/matrix/channel.py @@ -59,7 +59,13 @@ VideoContent, ) except ImportError: # pragma: no cover - BaseChannel = object # type: ignore[assignment,misc] + # Fallback to copaw package for environments without qwenpaw (e.g. Manager). + # copaw.app.channels.base.BaseChannel is a drop-in of qwenpaw's BaseChannel + # and ensures the copaw framework's channel loader (issubclass check) passes. + try: + from copaw.app.channels.base import BaseChannel + except ImportError: + BaseChannel = object # type: ignore[assignment,misc] ContentType = None # type: ignore[assignment] MessageType = None # type: ignore[assignment] RunStatus = None # type: ignore[assignment] From 6de66d7e69e9af56902507620418c92cb17be8a5 Mon Sep 17 00:00:00 2001 From: AgentTeamGuide Date: Sun, 21 Jun 2026 12:00:00 +0000 Subject: [PATCH 03/14] fix: add qwenpaw to Manager copaw image The Manager copaw image overlays copaw/src/matrix/channel.py which now imports from qwenpaw.app.channels.base. While channel.py has a copaw fallback, installing qwenpaw in the Manager image ensures the primary import path works and aligns the Manager's Python environment with the Worker image. --- manager/Dockerfile.copaw | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/manager/Dockerfile.copaw b/manager/Dockerfile.copaw index 546e763ff..21cf7b5c2 100644 --- a/manager/Dockerfile.copaw +++ b/manager/Dockerfile.copaw @@ -81,7 +81,8 @@ RUN pip install --no-cache-dir \ loongsuite-distro \ opentelemetry-exporter-otlp \ loongsuite-instrumentation-copaw \ - loongsuite-instrumentation-agentscope + loongsuite-instrumentation-agentscope \ + qwenpaw # ---- Overlay HiClaw Matrix channel (not yet merged into CoPaw release) ---- # Replace CoPaw's built-in Matrix channel with HiClaw-enhanced version From af511f77e9c01fdeacdb73d318bccf58b744a9a6 Mon Sep 17 00:00:00 2001 From: AgentTeamGuide Date: Sun, 21 Jun 2026 12:08:02 +0000 Subject: [PATCH 04/14] docs: update channel.py comment to reflect dual use The old comment said 'only executed inside a qwenpaw environment' which is no longer accurate since manager/Dockerfile.copaw also overlays this file but may only have copaw installed. --- copaw/src/matrix/channel.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/copaw/src/matrix/channel.py b/copaw/src/matrix/channel.py index a77b7b996..5179c74ea 100644 --- a/copaw/src/matrix/channel.py +++ b/copaw/src/matrix/channel.py @@ -43,8 +43,10 @@ logger = logging.getLogger("copaw.channels.matrix") # --------------------------------------------------------------------------- -# Lazy import of QwenPaw base types so this file can be syntax-checked without -# qwenpaw installed (it's only executed inside a qwenpaw environment). +# Lazy import of QwenPaw base types with copaw fallback. This file is used by +# both the Worker (copaw/Dockerfile, has qwenpaw) and Manager +# (manager/Dockerfile.copaw, may only have copaw). The fallback ensures the +# channel is recognized in either environment. # --------------------------------------------------------------------------- try: from qwenpaw.app.channels.base import BaseChannel From 4bf6e540b58d636166a644f2401f639c06e54b50 Mon Sep 17 00:00:00 2001 From: AgentTeamGuide Date: Sun, 21 Jun 2026 13:41:06 +0000 Subject: [PATCH 05/14] fix: remove qwenpaw from Manager copaw image MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The dual fallback in channel.py correctly uses qwenpaw's BaseChannel when qwenpaw is available (Worker) and falls back to copaw's BaseChannel when it's not (Manager). Installing qwenpaw in the Manager causes the fallback to pick qwenpaw's BaseChannel, which then fails the copaw framework's issubclass check during channel registration — since copaw.app.channels.base.BaseChannel and qwenpaw.app.channels.base.BaseChannel are different classes in different packages. The Manager only needs copaw; the channel.py dual fallback handles the rest. --- manager/Dockerfile.copaw | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/manager/Dockerfile.copaw b/manager/Dockerfile.copaw index 21cf7b5c2..546e763ff 100644 --- a/manager/Dockerfile.copaw +++ b/manager/Dockerfile.copaw @@ -81,8 +81,7 @@ RUN pip install --no-cache-dir \ loongsuite-distro \ opentelemetry-exporter-otlp \ loongsuite-instrumentation-copaw \ - loongsuite-instrumentation-agentscope \ - qwenpaw + loongsuite-instrumentation-agentscope # ---- Overlay HiClaw Matrix channel (not yet merged into CoPaw release) ---- # Replace CoPaw's built-in Matrix channel with HiClaw-enhanced version From 9f803adaf1aa043ef10d03a79e30707ced9b24ec Mon Sep 17 00:00:00 2001 From: AgentTeamGuide Date: Sun, 21 Jun 2026 15:38:53 +0000 Subject: [PATCH 06/14] fix: import agentscope_runtime in copaw fallback to fix Manager empty replies When the copaw manager (without qwenpaw) uses the fallback path, ContentType/MessageType/RunStatus were set to None even though agentscope_runtime is available. This caused all Manager replies to be empty (null), failing 5 CI tests. Move the agentscope_runtime import into the copaw fallback try block, only falling through to None when the import truly fails. --- copaw/src/matrix/channel.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/copaw/src/matrix/channel.py b/copaw/src/matrix/channel.py index 5179c74ea..e7b8e5e10 100644 --- a/copaw/src/matrix/channel.py +++ b/copaw/src/matrix/channel.py @@ -66,11 +66,15 @@ # and ensures the copaw framework's channel loader (issubclass check) passes. try: from copaw.app.channels.base import BaseChannel + from agentscope_runtime.engine.schemas.agent_schemas import ( + AudioContent, ContentType, FileContent, ImageContent, + MessageType, RunStatus, TextContent, VideoContent, + ) except ImportError: BaseChannel = object # type: ignore[assignment,misc] - ContentType = None # type: ignore[assignment] - MessageType = None # type: ignore[assignment] - RunStatus = None # type: ignore[assignment] + ContentType = None # type: ignore[assignment] + MessageType = None # type: ignore[assignment] + RunStatus = None # type: ignore[assignment] CHANNEL_KEY = "matrix" From 9911d1233040e9e55837064faf56978da792c8f4 Mon Sep 17 00:00:00 2001 From: AgentTeamGuide Date: Mon, 22 Jun 2026 02:26:19 +0000 Subject: [PATCH 07/14] fix: address maintainer review feedback for PR #950 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Working dir: export QWENPAW_WORKING_DIR alongside COPAW_WORKING_DIR so qwenpaw.constant resolves the runtime dir without relying on legacy COPAW_ fallback alone. 2. Dockerfile: add qwenpaw CLI symlink to PATH for TeamHarness adapter compatibility. 3. bridge.py: change agents.profiles injection from shallow setdefault() to nested merge. Previously, if config.json already had an 'agents' key, profiles/active_agent/workspace_dir were not injected. Now each sub-key is setdefault'd individually. 4. chanhelog: add PR #950 entries for copaw/ image-affecting changes. Note on Matrix config overlay: the maintainer asked about potential inconsistency between the config.py overlay (copied to copaw/config/) and qwenpaw.config.config. Analysis confirms no inconsistency: - Matrix channel.py (overlaid to qwenpaw/app/channels/matrix/) does not import from config.py at all — it only depends on BaseChannel and agentscope_runtime, both available via the dual fallback. - The HiClaw config overlay (copaw/src/matrix/config.py) adds HiClaw-specific channel types to copaw.config.config and is only needed by copaw.app. qwenpaw.app loads qwenpaw.config.config (its own module), and the channel layer is config-agnostic. --- changelog/current.md | 8 ++++++++ copaw/Dockerfile | 3 +++ copaw/scripts/copaw-worker-entrypoint.sh | 1 + copaw/src/copaw_worker/bridge.py | 18 +++++++++--------- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/changelog/current.md b/changelog/current.md index 1e0cba579..f67e2315c 100644 --- a/changelog/current.md +++ b/changelog/current.md @@ -14,6 +14,14 @@ Record image-affecting changes to `manager/`, `worker/`, `copaw/`, `openclaw-bas - **modelProvider authorization boundary**: Controller reconcilers now own provider-specific AI route authorization, while provisioning keeps using the default gateway authorization path to avoid duplicate provider coupling. - **Matrix AppService mode**: The controller can register as a Matrix Application Service and provision/log in users with the `as_token` instead of per-user passwords (legacy password auth is preserved when disabled). Enabled by default via `HICLAW_MATRIX_APPSERVICE_ENABLED`; the install script and the Helm `runtime-env` Secret generate and persist `HICLAW_MATRIX_APPSERVICE_AS_TOKEN` / `HICLAW_MATRIX_APPSERVICE_HS_TOKEN`. Set `HICLAW_MATRIX_APPSERVICE_USER_NAMESPACE_REGEX` to narrow the exclusive user namespace when running against a shared / pre-existing homeserver. +**Copaw → QwenPaw Migration** + +- feat(copaw): switch worker runtime entrypoint from `copaw.app._app:app` to `qwenpaw.app._app:app`, adding `QWENPAW_WORKING_DIR` env alongside `COPAW_WORKING_DIR` for proper working directory resolution. +- feat(copaw): add `qwenpaw` CLI symlink to PATH for TeamHarness adapter compatibility. +- fix(copaw): change `agents.profiles` injection from shallow `setdefault` to nested merge, ensuring `active_agent`, `profiles.default.id`, and `workspace_dir` are present even when `config.json` already has an `agents` section. +- fix(copaw): add copaw `BaseChannel` fallback in `channel.py` so Manager (copaw-only) can load the Matrix channel without qwenpaw installed. +- fix(copaw): restore `agentscope_runtime` imports in copaw fallback path to prevent empty Manager replies. + **Bug Fixes** - **CoPaw worker runtime environment**: CoPaw workers now prefer AgentTeams storage/runtime environment variables while preserving legacy HiClaw fallbacks, and Qwen-style model health preflights disable thinking for lightweight readiness checks. diff --git a/copaw/Dockerfile b/copaw/Dockerfile index baeb45fa5..44c10da58 100644 --- a/copaw/Dockerfile +++ b/copaw/Dockerfile @@ -144,6 +144,9 @@ RUN STANDARD_SITE=/opt/venv/standard/lib/python3.11/site-packages/copaw_worker \ # copaw-sync wrapper RUN printf '#!/bin/bash\nexec python3 "/root/.copaw-worker/${HICLAW_WORKER_NAME}/skills/file-sync/scripts/copaw-sync.py" "$@"\n' \ > /usr/local/bin/copaw-sync && chmod +x /usr/local/bin/copaw-sync +# CoPaw runtime CLI (make it available in PATH for skills) +RUN ln -sf /opt/venv/standard/bin/copaw /usr/local/bin/copaw \ + && ln -sf /opt/venv/standard/bin/qwenpaw /usr/local/bin/qwenpaw # Entrypoint script COPY --chmod=755 scripts/copaw-worker-entrypoint.sh /opt/hiclaw/scripts/copaw-worker-entrypoint.sh diff --git a/copaw/scripts/copaw-worker-entrypoint.sh b/copaw/scripts/copaw-worker-entrypoint.sh index 5d5b8bac9..acdd69186 100755 --- a/copaw/scripts/copaw-worker-entrypoint.sh +++ b/copaw/scripts/copaw-worker-entrypoint.sh @@ -23,6 +23,7 @@ source /opt/hiclaw/scripts/lib/hiclaw-env.sh WORKER_NAME="${AGENTTEAMS_WORKER_NAME:-${HICLAW_WORKER_NAME:-}}" [ -n "${WORKER_NAME}" ] || { echo "AGENTTEAMS_WORKER_NAME is required" >&2; exit 1; } INSTALL_DIR="/root/.copaw-worker" +export QWENPAW_WORKING_DIR="${INSTALL_DIR}/${WORKER_NAME}" CONSOLE_PORT="${AGENTTEAMS_CONSOLE_PORT:-${HICLAW_CONSOLE_PORT:-}}" log() { diff --git a/copaw/src/copaw_worker/bridge.py b/copaw/src/copaw_worker/bridge.py index a5656b599..935aad7a5 100644 --- a/copaw/src/copaw_worker/bridge.py +++ b/copaw/src/copaw_worker/bridge.py @@ -454,15 +454,15 @@ def _write_config_json(working_dir: Path) -> None: cfg = json.load(f) except Exception: cfg = {} - cfg.setdefault("agents", { - "active_agent": "default", - "profiles": { - "default": { - "id": "default", - "workspace_dir": str(working_dir / "workspaces" / "default"), - } - }, - }) + # Nested merge: preserve existing agents fields while ensuring required keys exist. + cfg.setdefault("agents", {}) + cfg["agents"].setdefault("active_agent", "default") + cfg["agents"].setdefault("profiles", {}) + cfg["agents"]["profiles"].setdefault("default", {}) + cfg["agents"]["profiles"]["default"].setdefault("id", "default") + cfg["agents"]["profiles"]["default"].setdefault( + "workspace_dir", str(working_dir / "workspaces" / "default") + ) with open(cfg_path, "w") as f: json.dump(cfg, f, indent=2, ensure_ascii=False) From bc5731e0ea36f60154efad6bab589a9f3d1e8a14 Mon Sep 17 00:00:00 2001 From: AgentTeamGuide Date: Fri, 3 Jul 2026 14:32:16 +0000 Subject: [PATCH 08/14] fix: rename duplicate _write_config_json to _ensure_config_json to avoid shadowing original 3-arg version --- copaw/src/copaw_worker/bridge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/copaw/src/copaw_worker/bridge.py b/copaw/src/copaw_worker/bridge.py index 935aad7a5..4fff1c6fe 100644 --- a/copaw/src/copaw_worker/bridge.py +++ b/copaw/src/copaw_worker/bridge.py @@ -444,7 +444,7 @@ def _apply_credential_guard(standard_dir: Path, runtime_dir: Path) -> None: logger.info("bridge: credential guard applied %d protected paths", count) -def _write_config_json(working_dir: Path) -> None: +def _ensure_config_json(working_dir: Path) -> None: """Install config.json from template if missing. Never overwrite.""" _install_from_template(working_dir / "config.json", "config.json") # Ensure agents.profiles section exists (required by qwenpaw). From 865708ac6532d47d5a6c9c4af84c16532f917f59 Mon Sep 17 00:00:00 2001 From: LUOSENGWA Date: Fri, 3 Jul 2026 16:04:10 +0000 Subject: [PATCH 09/14] fix: add missing Callable import in bridge.py --- copaw/src/copaw_worker/bridge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/copaw/src/copaw_worker/bridge.py b/copaw/src/copaw_worker/bridge.py index 4fff1c6fe..230a46758 100644 --- a/copaw/src/copaw_worker/bridge.py +++ b/copaw/src/copaw_worker/bridge.py @@ -14,7 +14,7 @@ import shutil from importlib import resources from pathlib import Path -from typing import Any +from typing import Any, Callable def _port_remap(url: str, is_container: bool) -> str: From b24af8f986321c90e559f55ecd043f3b3a640a86 Mon Sep 17 00:00:00 2001 From: LUOSENGWA Date: Sat, 4 Jul 2026 02:24:39 +0000 Subject: [PATCH 10/14] fix: restore bridge_standard_to_runtime lost during rebase --- copaw/src/copaw_worker/bridge.py | 752 +++++++++++++++++++------------ 1 file changed, 472 insertions(+), 280 deletions(-) diff --git a/copaw/src/copaw_worker/bridge.py b/copaw/src/copaw_worker/bridge.py index 230a46758..79dd3248e 100644 --- a/copaw/src/copaw_worker/bridge.py +++ b/copaw/src/copaw_worker/bridge.py @@ -1,21 +1,165 @@ """ -Bridge: translate openclaw.json (HiClaw Worker config) into CoPaw's -config.json + providers.json, then set COPAW_WORKING_DIR so CoPaw -picks up the right workspace. +Bridge between HiClaw's standard space and CoPaw's runtime space. + +The standard space is the OpenClaw-style sync root restored from MinIO, for +example ``/root/.hiclaw-worker//``. It is the durable, runtime-agnostic +layout owned by HiClaw: + + - ``openclaw.json`` + - ``SOUL.md`` / ``AGENTS.md`` / ``HEARTBEAT.md`` + - ``skills/``, ``config/``, ``credentials/``, and other synced files + +The runtime space is CoPaw's native working directory under the standard space, +for example ``/root/.hiclaw-worker//.copaw/``. It is the layout that the +CoPaw process actually reads and mutates while running: + + - ``config.json`` + - ``providers.json`` and ``.copaw.secret/providers.json`` + - ``workspaces/default/agent.json`` + - ``workspaces/default/SOUL.md`` / ``AGENTS.md`` / ``HEARTBEAT.md`` + +Standard space -> runtime space: + + - Convert ``openclaw.json`` into CoPaw-native structured config: + ``config.json``, ``providers.json``, and ``workspaces/default/agent.json``. + - Patch CoPaw path constants so the running process reads this runtime space. + - Copy ``providers.json`` into the adjacent secret dir that CoPaw reads. + - Copy prompt files into ``workspaces/default/``. + - Copy ``config/mcporter.json`` into + ``workspaces/default/config/mcporter.json``; the legacy + ``mcporter-servers.json`` source is still accepted. + - Expose Manager-pushed ``skills//`` directories by making + ``workspaces/default/skills`` a symlink to the standard-space ``skills/`` + directory. The standard space remains canonical. + +Runtime space -> standard space: + + - Copy agent-edited prompt files from ``workspaces/default/`` back to the + standard space when the runtime copy is newer. + - Leave MinIO upload to ``sync.push_local``; this bridge only materializes the + standard-space files that the normal push loop will persist. """ -from __future__ import annotations - -import logging -logger = logging.getLogger(__name__) +from __future__ import annotations import json +import logging import os import shutil from importlib import resources from pathlib import Path from typing import Any, Callable +logger = logging.getLogger(__name__) + +# Sentinel returned by derivers to mean "skip this policy this run" (the +# corresponding key is left as-is in agent.json). +_MISSING: Any = object() + + +def bridge_standard_to_runtime( + standard_dir: Path, + runtime_dir: Path, + controller_config: dict[str, Any], + *, + skill_names: list[str] | None = None, + profile: str = "worker", + agent: str = "default", +) -> None: + """Materialize standard-space files into CoPaw's runtime space.""" + sync_outer_prompt_files_to_inner(standard_dir, runtime_dir) + bridge_openclaw_to_copaw( + controller_config, + runtime_dir, + profile=profile, + agent=agent, + ) + _apply_credential_guard(standard_dir, runtime_dir) + sync_mcporter_config_to_runtime(standard_dir, runtime_dir) + if skill_names is not None: + sync_skills_to_runtime(standard_dir, runtime_dir, skill_names) + + +def refresh_standard_to_runtime( + standard_dir: Path, + runtime_dir: Path, + controller_config: dict[str, Any], + *, + get_soul: Callable[[], str | None], + get_agents_md: Callable[[], str | None], + skill_names: list[str] | None = None, + profile: str = "worker", + agent: str = "default", +) -> None: + """Refresh runtime space during re-bridge, including legacy prompt fallback.""" + sync_rebridged_prompt_files_to_inner( + standard_dir, + runtime_dir, + get_soul=get_soul, + get_agents_md=get_agents_md, + ) + bridge_openclaw_to_copaw( + controller_config, + runtime_dir, + profile=profile, + agent=agent, + ) + _apply_credential_guard(standard_dir, runtime_dir) + sync_mcporter_config_to_runtime(standard_dir, runtime_dir) + if skill_names is not None: + sync_skills_to_runtime(standard_dir, runtime_dir, skill_names) + + +def bridge_openclaw_to_copaw( + openclaw_cfg: dict[str, Any], + working_dir: Path, + *, + profile: str = "worker", + agent: str = "default", +) -> None: + """Bridge OpenClaw-style config into CoPaw's runtime files.""" + if profile not in ("worker", "manager"): + raise ValueError( + f"unknown bridge profile: {profile!r} (use 'worker' or 'manager')" + ) + + working_dir.mkdir(parents=True, exist_ok=True) + in_container = _is_in_container() + + _ensure_config_json(working_dir) + _write_providers_json(openclaw_cfg, working_dir, in_container) + _write_agent_json( + openclaw_cfg, + working_dir, + in_container, + profile=profile, + agent=agent, + ) + + os.environ["COPAW_WORKING_DIR"] = str(working_dir) + _patch_copaw_paths(working_dir) + + secret_dir = _secret_dir(working_dir) + providers_src = working_dir / "providers.json" + if providers_src.exists(): + shutil.copy2(providers_src, secret_dir / "providers.json") + + +def bridge_controller_to_copaw( + controller_config: dict[str, Any], + working_dir: Path, + *, + profile: str = "worker", + agent: str = "default", +) -> None: + """Compatibility alias for bridge_openclaw_to_copaw.""" + bridge_openclaw_to_copaw( + controller_config, + working_dir, + profile=profile, + agent=agent, + ) + def _port_remap(url: str, is_container: bool) -> str: """Remap container-internal :8080 to host-exposed gateway port when needed.""" @@ -35,12 +179,7 @@ def _secret_dir(working_dir: Path) -> Path: def _patch_copaw_paths(working_dir: Path) -> None: - """Patch copaw's module-level path constants to point at working_dir. - - copaw.constant captures WORKING_DIR / SECRET_DIR at import time from - env vars, so setting COPAW_WORKING_DIR after import has no effect. - We must update the live module objects directly. - """ + """Patch copaw's module-level path constants to point at working_dir.""" secret_dir = _secret_dir(working_dir) secret_dir.mkdir(parents=True, exist_ok=True) @@ -48,7 +187,9 @@ def _patch_copaw_paths(working_dir: Path) -> None: import copaw.constant as _const _const.WORKING_DIR = working_dir _const.SECRET_DIR = secret_dir - _const.ACTIVE_SKILLS_DIR = working_dir / "active_skills" + _const.ACTIVE_SKILLS_DIR = ( + working_dir / "workspaces" / "default" / "skills" + ) _const.CUSTOMIZED_SKILLS_DIR = working_dir / "customized_skills" _const.MEMORY_DIR = working_dir / "memory" _const.CUSTOM_CHANNELS_DIR = working_dir / "custom_channels" @@ -75,71 +216,240 @@ def _patch_copaw_paths(working_dir: Path) -> None: except (ImportError, AttributeError): pass - # copaw.app.channels.registry binds CUSTOM_CHANNELS_DIR via - # `from ...constant import CUSTOM_CHANNELS_DIR` at import time, so it keeps - # a STALE copy of the default path even after we patch copaw.constant above. - # _discover_custom_channels() / register_custom_channel_routes() read this - # module global at CALL time, so rebinding it here (before ChannelManager - # starts) makes them see our working_dir/custom_channels regardless of - # import order. Without this the patched matrix_channel.py is never - # discovered and copaw falls back to its builtin (broken) Matrix channel. - try: - import copaw.app.channels.registry as _channels_registry - _channels_registry.CUSTOM_CHANNELS_DIR = working_dir / "custom_channels" - logger.info( - "bridge: patched channels registry CUSTOM_CHANNELS_DIR -> %s", - _channels_registry.CUSTOM_CHANNELS_DIR, - ) - except ImportError: - pass +def _template_text(name: str) -> str: + """Read a template by basename from the in-tree templates/ directory.""" + return (resources.files("copaw_worker") / "templates" / name).read_text( + encoding="utf-8" + ) -def bridge_openclaw_to_copaw( - openclaw_cfg: dict[str, Any], - working_dir: Path, - *, - profile: str = "manager", + +def _install_from_template(dst: Path, template_name: str) -> bool: + """Copy template -> dst only if dst is missing. Returns True when installed.""" + if dst.exists(): + return False + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_text(_template_text(template_name), encoding="utf-8") + logger.info("bridge: installed %s from template %s", dst, template_name) + return True + + +def sync_mcporter_config_to_runtime(standard_dir: Path, runtime_dir: Path) -> Path | None: + """Copy mcporter config from standard space into CoPaw's default workspace.""" + src_candidates = ( + standard_dir / "config" / "mcporter.json", + standard_dir / "mcporter-servers.json", + ) + src = next((candidate for candidate in src_candidates if candidate.exists()), None) + if src is None: + logger.info("No mcporter config found to copy from %s", standard_dir) + return None + + dst = runtime_dir / "workspaces" / "default" / "config" / "mcporter.json" + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src, dst) + logger.info("mcporter config copied to %s", dst) + return dst + + +def sync_skills_to_runtime( + standard_dir: Path, + runtime_dir: Path, + skill_names: list[str], +) -> list[str]: + """Expose Manager-pushed skills in CoPaw runtime space via symlink.""" + standard_skills_dir = standard_dir / "skills" + standard_skills_dir.mkdir(parents=True, exist_ok=True) + + # MinIO does not preserve Unix permission bits. Restore executable scripts + # in the standard space because runtime skills are a direct symlink to it. + for sh in standard_skills_dir.rglob("*.sh"): + sh.chmod(sh.stat().st_mode | 0o111) + + skill_name_set = set(skill_names) + for child in list(standard_skills_dir.iterdir()): + if child.is_dir() and child.name not in skill_name_set: + shutil.rmtree(child) + logger.info("Removed stale standard skill no longer in MinIO: %s", child.name) + + workspace_skills_dir = runtime_dir / "workspaces" / "default" / "skills" + workspace_skills_dir.parent.mkdir(parents=True, exist_ok=True) + dedup_customized_skills(runtime_dir) + + expected_target = standard_skills_dir.resolve() + if workspace_skills_dir.is_symlink(): + if workspace_skills_dir.resolve() != expected_target: + workspace_skills_dir.unlink() + elif workspace_skills_dir.exists(): + if workspace_skills_dir.is_dir(): + shutil.rmtree(workspace_skills_dir) + else: + workspace_skills_dir.unlink() + + if not workspace_skills_dir.exists(): + target = os.path.relpath(standard_skills_dir, workspace_skills_dir.parent) + workspace_skills_dir.symlink_to(target, target_is_directory=True) + logger.info("Linked runtime skills dir %s -> %s", workspace_skills_dir, target) + + installed = [ + skill_name + for skill_name in skill_names + if (standard_skills_dir / skill_name).exists() + ] + for skill_name in installed: + logger.info("Exposed MinIO skill: %s", skill_name) + enable_workspace_skills_by_default(runtime_dir, installed) + return installed + + +def enable_workspace_skills_by_default( + runtime_dir: Path, + skill_names: list[str], ) -> None: - """ - Read openclaw_cfg (parsed openclaw.json) and write: - - /config.json (global config) - - /workspaces/default/agent.json (per-agent config) - - /providers.json (LLM credentials, for reference) - - .secret/providers.json (where copaw actually reads from) + """Seed CoPaw's workspace manifest so exposed HiClaw skills are active.""" + if not skill_names: + return - Also sets COPAW_WORKING_DIR env var and patches copaw's module-level - path constants so the running process uses the correct directory. + workspace_dir = runtime_dir / "workspaces" / "default" + workspace_skills_dir = workspace_dir / "skills" + manifest_path = workspace_dir / "skill.json" - """ - working_dir.mkdir(parents=True, exist_ok=True) - in_container = _is_in_container() + manifest: dict[str, Any] = { + "schema_version": "workspace-skill-manifest.v1", + "version": 1, + "skills": {}, + } + if manifest_path.exists(): + try: + loaded = json.loads(manifest_path.read_text(encoding="utf-8")) + if isinstance(loaded, dict): + manifest.update(loaded) + except json.JSONDecodeError: + logger.warning( + "Invalid CoPaw skill manifest, recreating: %s", + manifest_path, + ) - _write_config_json(openclaw_cfg, working_dir, in_container) - _write_agent_json(openclaw_cfg, working_dir, in_container, profile=profile) - _write_providers_json(openclaw_cfg, working_dir, in_container) + if not isinstance(manifest.get("skills"), dict): + manifest["skills"] = {} + skills = manifest["skills"] + changed = False + for skill_name in sorted(set(skill_names)): + if not (workspace_skills_dir / skill_name / "SKILL.md").exists(): + continue + existing = skills.get(skill_name) + if isinstance(existing, dict): + if existing.get("enabled") is not True: + existing["enabled"] = True + changed = True + if not existing.get("channels"): + existing["channels"] = ["all"] + changed = True + continue + skills[skill_name] = { + "enabled": True, + "channels": ["all"], + "source": "customized", + } + changed = True - os.environ["COPAW_WORKING_DIR"] = str(working_dir) + if changed or not manifest_path.exists(): + workspace_dir.mkdir(parents=True, exist_ok=True) + manifest_path.write_text( + json.dumps(manifest, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) - # Patch module-level constants (import-time values won't reflect env change) - _patch_copaw_paths(working_dir) - # Copy providers.json into secret_dir — that's where copaw actually reads it - secret_dir = _secret_dir(working_dir) - providers_src = working_dir / "providers.json" - if providers_src.exists(): - shutil.copy2(providers_src, secret_dir / "providers.json") +def dedup_customized_skills(runtime_dir: Path) -> None: + """Remove customized skills that shadow CoPaw builtins.""" + customized_dir = runtime_dir / "customized_skills" + if not customized_dir.is_dir(): + return + try: + import copaw.agents.skills as _skills_pkg + builtin_skills_root = Path(_skills_pkg.__file__).resolve().parent + except (ImportError, AttributeError): + return -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- + builtin_names: set[str] = set() + if builtin_skills_root.is_dir(): + for child in builtin_skills_root.iterdir(): + if child.is_dir() and not child.name.startswith("_"): + builtin_names.add(child.name) + + if not builtin_names: + return + + for child in list(customized_dir.iterdir()): + if child.is_dir() and child.name in builtin_names: + shutil.rmtree(child) + logger.info( + "Removed stale customized skill '%s' (now a builtin)", + child.name, + ) + + +def sync_outer_prompt_files_to_inner(local_dir: Path, copaw_working_dir: Path) -> None: + """Copy OpenClaw-style prompt files into CoPaw's default workspace.""" + workspace_dir = copaw_working_dir / "workspaces" / "default" + workspace_dir.mkdir(parents=True, exist_ok=True) + + for name in ("SOUL.md", "AGENTS.md"): + src = local_dir / name + if src.exists(): + (workspace_dir / name).write_text(src.read_text()) + + heartbeat_dst = workspace_dir / "HEARTBEAT.md" + if not heartbeat_dst.exists(): + heartbeat_src = local_dir / "HEARTBEAT.md" + if heartbeat_src.exists(): + heartbeat_dst.write_text(heartbeat_src.read_text()) + + +def sync_rebridged_prompt_files_to_inner( + local_dir: Path, + copaw_working_dir: Path, + *, + get_soul: Callable[[], str | None], + get_agents_md: Callable[[], str | None], +) -> None: + """Refresh CoPaw prompt files during re-bridge while preserving legacy fallback.""" + soul_path = local_dir / "SOUL.md" + agents_path = local_dir / "AGENTS.md" + soul = soul_path.read_text() if soul_path.exists() else get_soul() + agents = agents_path.read_text() if agents_path.exists() else get_agents_md() + + workspace_dir = copaw_working_dir / "workspaces" / "default" + if soul: + workspace_dir.mkdir(parents=True, exist_ok=True) + (workspace_dir / "SOUL.md").write_text(soul) + if agents: + workspace_dir.mkdir(parents=True, exist_ok=True) + (workspace_dir / "AGENTS.md").write_text(agents) + + +def _matrix_raw(cfg: dict[str, Any]) -> dict[str, Any]: + return cfg.get("channels", {}).get("matrix", {}) -def _resolve_active_model(cfg: dict[str, Any]) -> dict[str, Any] | None: - """Return the config dict of the active model from openclaw.json, or None. - Prefers agents.defaults.model.primary ("provider_id/model_id"); - falls back to the first model of the first provider. - """ +def _matrix_bool( + cfg: dict[str, Any], + camel_key: str, + snake_key: str, + default: bool, +) -> bool: + matrix = _matrix_raw(cfg) + if camel_key in matrix: + return bool(matrix.get(camel_key)) + if snake_key in matrix: + return bool(matrix.get(snake_key)) + return default + + +def _resolve_active_model(cfg: dict[str, Any]) -> dict[str, Any] | None: + """Return the config dict of the active model from openclaw.json, or None.""" providers_raw = cfg.get("models", {}).get("providers", {}) if not providers_raw: return None @@ -158,7 +468,6 @@ def _resolve_active_model(cfg: dict[str, Any]) -> dict[str, Any] | None: if m.get("id") == mid: return m - # Fallback: first provider, first model for provider_cfg in providers_raw.values(): models = provider_cfg.get("models", []) if models: @@ -168,7 +477,6 @@ def _resolve_active_model(cfg: dict[str, Any]) -> dict[str, Any] | None: def _resolve_context_window(cfg: dict[str, Any]) -> int | None: - """Return the contextWindow of the active (or first) model, or None.""" m = _resolve_active_model(cfg) if m and "contextWindow" in m: return int(m["contextWindow"]) @@ -176,100 +484,64 @@ def _resolve_context_window(cfg: dict[str, Any]) -> int | None: def _resolve_vision_enabled(cfg: dict[str, Any]) -> bool: - """Return True if the active model declares image input support. - - The openclaw.json model's ``input`` field is a list of supported modalities - (e.g. ["text", "image"]). If the field is absent we assume text-only to - avoid sending images to a model that cannot handle them. - """ + """True if the active model declares image input support.""" m = _resolve_active_model(cfg) if m is None: return False - input_types = m.get("input", []) - return "image" in input_types + return "image" in m.get("input", []) -# --------------------------------------------------------------------------- -# config.json -# --------------------------------------------------------------------------- - -def _write_config_json( +def _resolve_embedding_config( cfg: dict[str, Any], - working_dir: Path, in_container: bool, -) -> None: - matrix_raw = cfg.get("channels", {}).get("matrix", {}) - homeserver = _port_remap( - matrix_raw.get("homeserver", ""), in_container +) -> dict[str, Any] | None: + """Extract embedding config from openclaw's ``agents.defaults.memorySearch``.""" + memory_search = ( + cfg.get("agents", {}) + .get("defaults", {}) + .get("memorySearch", {}) ) - access_token = matrix_raw.get("accessToken", "") - - # DM allowlist - dm_cfg = matrix_raw.get("dm", {}) - dm_policy = dm_cfg.get("policy", "allowlist") - dm_allow_from: list[str] = dm_cfg.get("allowFrom", []) + if not memory_search: + return None - # Group allowlist - group_policy = matrix_raw.get("groupPolicy", "allowlist") - group_allow_from: list[str] = matrix_raw.get("groupAllowFrom", []) + remote = memory_search.get("remote", {}) + base_url = _port_remap(remote.get("baseUrl", ""), in_container) + api_key = remote.get("apiKey", "") + model = memory_search.get("model", "") - # Per-room/group config (pass through as-is for MatrixChannel to use) - groups = matrix_raw.get("groups", {}) + if not base_url or not model: + return None - # History limit: openclaw uses camelCase "historyLimit", bridge to snake_case. - history_limit = matrix_raw.get("historyLimit") - if history_limit is None: - history_limit = ( - cfg.get("messages", {}).get("groupChat", {}).get("historyLimit") + if not api_key: + logger.warning( + "memorySearch.remote.apiKey is empty; embedding requests will likely fail", ) - matrix_channel_cfg: dict[str, Any] = { - "enabled": matrix_raw.get("enabled", True), - "homeserver": homeserver, - "access_token": access_token, - "encryption": matrix_raw.get("encryption", False), - "dm_policy": dm_policy, - "allow_from": dm_allow_from, - "group_policy": group_policy, - "group_allow_from": group_allow_from, - "groups": groups, - "filter_tool_messages": True, - "filter_thinking": True, - "vision_enabled": _resolve_vision_enabled(cfg), - } - if history_limit is not None: - matrix_channel_cfg["history_limit"] = int(history_limit) - - config_path = working_dir / "config.json" - # Merge with existing config to avoid clobbering other settings - existing: dict[str, Any] = {} - if config_path.exists(): - with open(config_path) as f: - existing = json.load(f) + dimensions = ( + memory_search.get("outputDimensionality") + or int(os.environ.get("HICLAW_EMBEDDING_DIMENSIONS", "0")) + or 1024 + ) - existing.setdefault("channels", {})["matrix"] = matrix_channel_cfg - # Disable console channel (we use Matrix) - existing["channels"].setdefault("console", {})["enabled"] = False - - # Bridge model context window → agents.running.max_input_length so that - # CoPaw's memory compaction threshold tracks the actual model capability. - # We read contextWindow from the first model of the primary (or first) - # provider to avoid hard-coding a default that mismatches the real model. - context_window = _resolve_context_window(cfg) - if context_window is not None: - existing.setdefault("agents", {}).setdefault("running", {})[ - "max_input_length" - ] = context_window - - with open(config_path, "w") as f: - json.dump(existing, f, indent=2, ensure_ascii=False) + return { + "backend": "openai", + "api_key": api_key, + "base_url": base_url, + "model_name": model, + "dimensions": dimensions, + "enable_cache": True, + "use_dimensions": False, + } +def _resolve_history_limit(cfg: dict[str, Any]) -> int | None: + matrix_raw = _matrix_raw(cfg) + hl = matrix_raw.get("historyLimit") + if hl is None: + hl = cfg.get("messages", {}).get("groupChat", {}).get("historyLimit") + return int(hl) if hl is not None else None -# --------------------------------------------------------------------------- -# agent.json — per-agent config (CoPaw 1.0.2+ reads this, not config.json) -# --------------------------------------------------------------------------- def _derive_matrix_user_id(cfg: dict[str, Any], _in_container: bool = False) -> Any: """Derive CoPaw Matrix user_id from OpenClaw config or env.""" m = _matrix_raw(cfg) @@ -447,122 +719,46 @@ def _apply_credential_guard(standard_dir: Path, runtime_dir: Path) -> None: def _ensure_config_json(working_dir: Path) -> None: """Install config.json from template if missing. Never overwrite.""" _install_from_template(working_dir / "config.json", "config.json") - # Ensure agents.profiles section exists (required by qwenpaw). - cfg_path = working_dir / "config.json" - try: - with open(cfg_path) as f: - cfg = json.load(f) - except Exception: - cfg = {} - # Nested merge: preserve existing agents fields while ensuring required keys exist. - cfg.setdefault("agents", {}) - cfg["agents"].setdefault("active_agent", "default") - cfg["agents"].setdefault("profiles", {}) - cfg["agents"]["profiles"].setdefault("default", {}) - cfg["agents"]["profiles"]["default"].setdefault("id", "default") - cfg["agents"]["profiles"]["default"].setdefault( - "workspace_dir", str(working_dir / "workspaces" / "default") - ) - with open(cfg_path, "w") as f: - json.dump(cfg, f, indent=2, ensure_ascii=False) def _write_agent_json( - cfg: dict[str, Any], + controller_config: dict[str, Any], working_dir: Path, in_container: bool, *, profile: str = "worker", + agent: str = "default", ) -> None: - """Create agent.json from template, then overlay Matrix channel config. - - CoPaw 1.0.2+ reads workspace/agent.json for per-agent configuration. - The template provides defaults; we overlay controller-owned fields - (Matrix access_token, homeserver, allowlists, context window). - """ - workspace_dir = working_dir / "workspaces" / "default" - workspace_dir.mkdir(parents=True, exist_ok=True) - agent_path = workspace_dir / "agent.json" + """Create agent.json from template if absent; then overlay controller fields.""" + agent_path = working_dir / "workspaces" / agent / "agent.json" + _install_from_template(agent_path, f"agent.{profile}.json") - # Install from template if missing - if not agent_path.exists(): - template_name = f"agent.{profile}.json" - try: - # Try loading from package templates directory - tmpl_dir = Path(__file__).resolve().parent / "templates" - tmpl_path = tmpl_dir / template_name - if tmpl_path.exists(): - shutil.copy2(str(tmpl_path), str(agent_path)) - else: - # Fallback: create minimal agent.json - minimal = { - "id": "default", - "name": "Manager" if profile == "manager" else "Default Agent", - "language": "zh", - "channels": { - "console": {"enabled": True}, - "matrix": { - "enabled": True, - "filter_tool_messages": False, - "filter_thinking": True, - "allow_from": [], - "group_allow_from": [], - "groups": {}, - }, - }, - "running": {"max_iters": 200}, - } - with open(agent_path, "w") as f: - json.dump(minimal, f, indent=2) - except Exception: - pass - - # Load existing agent.json try: with open(agent_path) as f: - agent_cfg = json.load(f) - except Exception: - agent_cfg = {"id": "default", "channels": {}, "running": {}} - - # Overlay Matrix channel config from openclaw.json - matrix_raw = cfg.get("channels", {}).get("matrix", {}) - homeserver = _port_remap(matrix_raw.get("homeserver", ""), in_container) - access_token = matrix_raw.get("accessToken", "") - - dm_cfg = matrix_raw.get("dm", {}) - dm_allow_from: list[str] = dm_cfg.get("allowFrom", []) - group_allow_from: list[str] = matrix_raw.get("groupAllowFrom", []) - groups = matrix_raw.get("groups", {}) - - matrix_ch = agent_cfg.setdefault("channels", {}).setdefault("matrix", {}) - matrix_ch["enabled"] = matrix_raw.get("enabled", True) - if homeserver: - matrix_ch["homeserver"] = homeserver - if access_token: - matrix_ch["access_token"] = access_token - matrix_ch["allow_from"] = dm_allow_from - matrix_ch["group_allow_from"] = group_allow_from - matrix_ch["groups"] = groups - matrix_ch["filter_tool_messages"] = True - matrix_ch["filter_thinking"] = True - - # Disable console channel (we use Matrix) - agent_cfg.setdefault("channels", {}).setdefault("console", {})["enabled"] = False - - # Bridge context window - context_window = _resolve_context_window(cfg) - if context_window is not None: - agent_cfg.setdefault("running", {})["max_input_length"] = context_window - - # Set workspace_dir - agent_cfg.setdefault("workspace_dir", str(workspace_dir)) + existing = json.load(f) + if not isinstance(existing, dict): + raise ValueError("agent.json root is not a dict") + except Exception as exc: + logger.warning( + "agent.json at %s is unreadable (%s); re-seeding from template", + agent_path, + exc, + ) + agent_path.unlink(missing_ok=True) + _install_from_template(agent_path, f"agent.{profile}.json") + with open(agent_path) as f: + existing = json.load(f) + + for path, policy, deriver in _CONTROLLER_FIELDS: + remote_value = deriver(controller_config, in_container) + _apply_policy(existing, path, policy, remote_value) + + # workspace_dir depends on local filesystem layout; seed once, never rewrite. + existing.setdefault("workspace_dir", str(agent_path.parent)) with open(agent_path, "w") as f: - json.dump(agent_cfg, f, indent=2, ensure_ascii=False) + json.dump(existing, f, indent=2, ensure_ascii=False) -# --------------------------------------------------------------------------- -# providers.json -# --------------------------------------------------------------------------- def _write_providers_json( cfg: dict[str, Any], @@ -576,9 +772,7 @@ def _write_providers_json( active_model = "" for provider_id, provider_cfg in providers_raw.items(): - base_url = _port_remap( - provider_cfg.get("baseUrl", ""), in_container - ) + base_url = _port_remap(provider_cfg.get("baseUrl", ""), in_container) api_key = provider_cfg.get("apiKey", "") models_raw = provider_cfg.get("models", []) @@ -599,13 +793,10 @@ def _write_providers_json( "chat_model": "OpenAIChatModel", } - # Use first provider + first model as active LLM if not active_provider_id and models: active_provider_id = provider_id active_model = models[0]["id"] - # Resolve active model from agents.defaults.model.primary - # Format: "provider_id/model_id" primary = ( cfg.get("agents", {}) .get("defaults", {}) @@ -631,30 +822,25 @@ def _write_providers_json( with open(providers_path, "w") as f: json.dump(providers_data, f, indent=2, ensure_ascii=False) - - -# --------------------------------------------------------------------------- -# Runtime-to-standard sync (worker uses this to push edits back to sync root) -# --------------------------------------------------------------------------- - -def bridge_runtime_to_standard(standard_dir): +def bridge_runtime_to_standard(standard_dir: Path) -> None: """Materialize runtime-space edits back into the standard sync root.""" sync_inner_prompt_files_to_outer(standard_dir) -def sync_inner_prompt_files_to_outer(local_dir): +def sync_inner_prompt_files_to_outer(local_dir: Path) -> None: """Copy agent-edited prompt files from CoPaw workspace back to sync root.""" inner_outer_files = ("AGENTS.md", "SOUL.md", "HEARTBEAT.md") - copaw_ws_dir = Path(local_dir) / ".copaw" / "workspaces" / "default" + copaw_ws_dir = local_dir / ".copaw" / "workspaces" / "default" for name in inner_outer_files: inner = copaw_ws_dir / name - outer = Path(local_dir) / name + outer = local_dir / name if not inner.exists(): continue try: inner_mtime = inner.stat().st_mtime except OSError: continue + # Only copy if inner is newer than outer (or outer doesn't exist) outer_mtime = outer.stat().st_mtime if outer.exists() else 0 if inner_mtime > outer_mtime: inner_content = inner.read_text(errors="replace") @@ -662,53 +848,59 @@ def sync_inner_prompt_files_to_outer(local_dir): if inner_content != outer_content: outer.write_text(inner_content) logger.debug( - "Inner->Outer sync: .copaw/workspaces/default/%s -> %s", + "Inner→Outer sync: .copaw/workspaces/default/%s → %s", name, name, ) -# --------------------------------------------------------------------------- -# CLI entry point — used by manager/scripts/init/start-copaw-manager.sh -# --------------------------------------------------------------------------- -def _main_cli(argv=None): + +def _main_cli(argv: list[str] | None = None) -> int: import argparse parser = argparse.ArgumentParser( prog="python -m copaw_worker.bridge", - description="Bridge Controller config into CoPaw runtime files.", + description=( + "Bridge Controller config (openclaw.json today) into CoPaw's " + "config.json / agent.json / providers.json." + ), ) parser.add_argument("--openclaw-json", required=True, - help="Path to openclaw.json") + help="Path to the controller config file (openclaw.json)") parser.add_argument("--working-dir", required=True, help="CoPaw working dir (e.g. ~/.copaw)") - parser.add_argument("--profile", default="manager", - choices=["worker", "manager"], - help="Template profile (default: manager)") + parser.add_argument("--profile", default="worker", choices=["worker", "manager"], + help="Template profile to use on first boot") + parser.add_argument("--agent", default="default", + help="CoPaw workspace key (maps to workspaces//). " + "Default: 'default'. Exposed for multi-agent setups.") args = parser.parse_args(argv) - from pathlib import Path as _Path - import json as _json - - openclaw_path = _Path(args.openclaw_json) + openclaw_path = Path(args.openclaw_json) if not openclaw_path.exists(): print(f"ERROR: {openclaw_path} not found", flush=True) return 1 - working_dir = _Path(args.working_dir) + working_dir = Path(args.working_dir) working_dir.mkdir(parents=True, exist_ok=True) with open(openclaw_path) as f: - controller_config = _json.load(f) + controller_config = json.load(f) bridge_openclaw_to_copaw( controller_config, working_dir, profile=args.profile, + agent=args.agent, + ) + print( + f"Bridged {openclaw_path} -> {working_dir} " + f"(profile={args.profile}, agent={args.agent})", + flush=True, ) return 0 if __name__ == "__main__": - import sys as _sys - _sys.exit(_main_cli()) + import sys + sys.exit(_main_cli()) From d1c5fec9d28a493fcbce0ab51047a599f1b4acf3 Mon Sep 17 00:00:00 2001 From: LUOSENGWA Date: Sat, 4 Jul 2026 03:50:43 +0000 Subject: [PATCH 11/14] fix: add qwenpaw dependency to copaw-worker Dockerfile and pyproject.toml --- copaw/Dockerfile | 3 ++- copaw/pyproject.toml | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/copaw/Dockerfile b/copaw/Dockerfile index 44c10da58..5ea092262 100644 --- a/copaw/Dockerfile +++ b/copaw/Dockerfile @@ -88,7 +88,8 @@ RUN python -m venv /opt/venv/standard \ && /opt/venv/standard/bin/pip install --no-cache-dir \ --index-url "${PIP_INDEX_URL}" \ --trusted-host "$(echo ${PIP_INDEX_URL} | sed 's|https\?://||;s|/.*||')" \ - /tmp/copaw-worker/ + /tmp/copaw-worker/ \ + qwenpaw # --- Patch CoPaw 1.0.2 Matrix channel indentation bug --- # _sync_loop method def is indented one level too deep inside diff --git a/copaw/pyproject.toml b/copaw/pyproject.toml index 02849d492..4953f2dc4 100644 --- a/copaw/pyproject.toml +++ b/copaw/pyproject.toml @@ -11,6 +11,7 @@ license = { text = "Apache-2.0" } requires-python = ">=3.10" dependencies = [ "copaw>=1.0.2,<2.0", + "qwenpaw>=1.1.12", "matrix-nio[e2e]>=0.24.0", "markdown-it-py>=3.0", "linkify-it-py>=2.0", From 06ea624735997b79e3820dd6765c551e76628dbe Mon Sep 17 00:00:00 2001 From: LUOSENGWA Date: Sat, 4 Jul 2026 04:01:09 +0000 Subject: [PATCH 12/14] fix: install qwenpaw separately to avoid dependency conflict with copaw qwenpaw depends on agentscope==1.0.20 which conflicts with copaw's agentscope version when resolved in the same pip install. Installing them in separate pip commands (copaw-worker first, then qwenpaw) mirrors the production deployment approach that works on Node1. --- copaw/Dockerfile | 3 +++ copaw/pyproject.toml | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/copaw/Dockerfile b/copaw/Dockerfile index 5ea092262..d14646761 100644 --- a/copaw/Dockerfile +++ b/copaw/Dockerfile @@ -89,6 +89,9 @@ RUN python -m venv /opt/venv/standard \ --index-url "${PIP_INDEX_URL}" \ --trusted-host "$(echo ${PIP_INDEX_URL} | sed 's|https\?://||;s|/.*||')" \ /tmp/copaw-worker/ \ + && /opt/venv/standard/bin/pip install --no-cache-dir \ + --index-url "${PIP_INDEX_URL}" \ + --trusted-host "$(echo ${PIP_INDEX_URL} | sed 's|https\?://||;s|/.*||')" \ qwenpaw # --- Patch CoPaw 1.0.2 Matrix channel indentation bug --- diff --git a/copaw/pyproject.toml b/copaw/pyproject.toml index 4953f2dc4..02849d492 100644 --- a/copaw/pyproject.toml +++ b/copaw/pyproject.toml @@ -11,7 +11,6 @@ license = { text = "Apache-2.0" } requires-python = ">=3.10" dependencies = [ "copaw>=1.0.2,<2.0", - "qwenpaw>=1.1.12", "matrix-nio[e2e]>=0.24.0", "markdown-it-py>=3.0", "linkify-it-py>=2.0", From 59782d50012769460c4ee89a8c39e1c8f16eab51 Mon Sep 17 00:00:00 2001 From: LUOSENGWA Date: Sat, 4 Jul 2026 16:01:16 +0000 Subject: [PATCH 13/14] fix: use qwenpaw channel health API instead of marker file for readiness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The copaw worker adapter's build_worker_readiness() waited for a marker file (/tmp/hiclaw-copaw-{name}-matrix-ready) that was written by copaw's own Matrix channel after initial sync. After switching to qwenpaw, the Matrix channel is qwenpaw/app/channels/matrix/channel.py which has no knowledge of HICLAW_MATRIX_CHANNEL_READY_FILE — the marker was never created, causing /worker/readyz to permanently return not_ready. Fix: use qwenpaw's standard GET /config/channels/matrix/health API instead of the legacy inter-process marker file side-channel. This API is available for every channel in qwenpaw and is the canonical way to check channel readiness. Root cause of CI test failures: test-02, test-15, test-21. --- copaw/src/copaw_worker/health.py | 24 ++++++++++++++++++++++++ copaw/src/copaw_worker/worker.py | 9 +++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/copaw/src/copaw_worker/health.py b/copaw/src/copaw_worker/health.py index 30b1f626a..5b82569f4 100644 --- a/copaw/src/copaw_worker/health.py +++ b/copaw/src/copaw_worker/health.py @@ -446,5 +446,29 @@ def _max_tokens_param(model_id: str) -> str: return "max_tokens" +def check_qwenpaw_matrix_channel( + console_port: int, + *, + timeout: float = 5, +) -> bool: + """Check whether qwenpaw Matrix channel reports healthy via its config API. + + Returns True when the Matrix channel has completed initial sync and is ready + to process messages, False otherwise. Uses the standard qwenpaw + ``GET /config/channels/matrix/health`` endpoint. + """ + url = f"http://127.0.0.1:{console_port}/config/channels/matrix/health" + try: + req = urllib.request.Request( + url, + headers={"Accept": "application/json"}, + method="GET", + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + data = json.loads(resp.read().decode()) + return data.get("status") == "healthy" + except Exception: + return False + def _now() -> str: return datetime.now(timezone.utc).isoformat() diff --git a/copaw/src/copaw_worker/worker.py b/copaw/src/copaw_worker/worker.py index 47880c661..78d42c102 100644 --- a/copaw/src/copaw_worker/worker.py +++ b/copaw/src/copaw_worker/worker.py @@ -28,6 +28,7 @@ from copaw_worker.config import WorkerConfig from copaw_worker.health import ( ComponentHealth, + check_qwenpaw_matrix_channel, HealthState, check_copaw_service, check_matrix_service, @@ -363,11 +364,11 @@ async def build_worker_readiness(self) -> dict[str, Any]: homeserver = _port_remap(matrix_cfg.get("homeserver", ""), _is_in_container()) matrix = await asyncio.to_thread(check_matrix_service, homeserver) if matrix.healthiness == "healthy": - marker_ready = ( - self._matrix_ready_marker is not None - and self._matrix_ready_marker.exists() + channel_ready = await asyncio.to_thread( + check_qwenpaw_matrix_channel, + self.config.console_port, ) - if not marker_ready: + if not channel_ready: matrix = ComponentHealth( "unhealthy", "Matrix channel is not ready", From 946a9dd7e20741fa063c7569a52bf8924e222a18 Mon Sep 17 00:00:00 2001 From: LUOSENGWA Date: Sat, 4 Jul 2026 17:56:25 +0000 Subject: [PATCH 14/14] fix: use Matrix joined_rooms API for true readiness + add model check logging (#950) - Replace qwenpaw health API (false positive: token check != sync complete) with Matrix /_matrix/client/v3/joined_rooms polling in background task. Marker file written when >= 1 room joined -> true readiness signal. - Add logger output to build_worker_readiness() model check to eliminate diagnostic blind spot (previously model check was silent during probes). - test-helpers.sh: print last readyz JSON on probe timeout; increase docker logs --tail from 100 to 200 to preserve startup diagnostics. --- copaw/src/copaw_worker/health.py | 24 +++-------- copaw/src/copaw_worker/worker.py | 74 ++++++++++++++++++++++++++++++-- tests/lib/test-helpers.sh | 7 ++- 3 files changed, 82 insertions(+), 23 deletions(-) diff --git a/copaw/src/copaw_worker/health.py b/copaw/src/copaw_worker/health.py index 5b82569f4..3eea58926 100644 --- a/copaw/src/copaw_worker/health.py +++ b/copaw/src/copaw_worker/health.py @@ -446,27 +446,15 @@ def _max_tokens_param(model_id: str) -> str: return "max_tokens" -def check_qwenpaw_matrix_channel( - console_port: int, - *, - timeout: float = 5, -) -> bool: - """Check whether qwenpaw Matrix channel reports healthy via its config API. +def check_qwenpaw_matrix_channel(marker_path: "str | Path") -> bool: + """Check whether the Matrix ready marker file exists. - Returns True when the Matrix channel has completed initial sync and is ready - to process messages, False otherwise. Uses the standard qwenpaw - ``GET /config/channels/matrix/health`` endpoint. + Returns True when the marker file written by the joined_rooms polling + task is present, signalling the qwenpaw Matrix channel has completed + initial sync and at least one room is joined. """ - url = f"http://127.0.0.1:{console_port}/config/channels/matrix/health" try: - req = urllib.request.Request( - url, - headers={"Accept": "application/json"}, - method="GET", - ) - with urllib.request.urlopen(req, timeout=timeout) as resp: - data = json.loads(resp.read().decode()) - return data.get("status") == "healthy" + return Path(marker_path).exists() except Exception: return False diff --git a/copaw/src/copaw_worker/worker.py b/copaw/src/copaw_worker/worker.py index 78d42c102..f3fac25ff 100644 --- a/copaw/src/copaw_worker/worker.py +++ b/copaw/src/copaw_worker/worker.py @@ -358,17 +358,24 @@ async def build_worker_readiness(self) -> dict[str, Any]: model = await asyncio.to_thread(check_model_service, openclaw_cfg) self._health.update("model", model.healthiness, model.message, model.details) + if model.healthiness == "healthy": + logger.info("readiness model check OK worker=%s", self.worker_name) + else: + logger.warning( + "readiness model check failed worker=%s message=%s details=%s", + self.worker_name, model.message, model.details, + ) matrix_cfg = openclaw_cfg.get("channels", {}).get("matrix", {}) from .bridge import _port_remap, _is_in_container homeserver = _port_remap(matrix_cfg.get("homeserver", ""), _is_in_container()) matrix = await asyncio.to_thread(check_matrix_service, homeserver) if matrix.healthiness == "healthy": - channel_ready = await asyncio.to_thread( - check_qwenpaw_matrix_channel, - self.config.console_port, + marker_ready = ( + self._matrix_ready_marker is not None + and self._matrix_ready_marker.exists() ) - if not channel_ready: + if not marker_ready: matrix = ComponentHealth( "unhealthy", "Matrix channel is not ready", @@ -425,6 +432,61 @@ async def _mark_copaw_startup_health( return await asyncio.sleep(interval) + + async def _poll_matrix_joined_rooms(self) -> None: + """Poll Matrix /_matrix/client/v3/joined_rooms until DM room appears. + + Writes _matrix_ready_marker when at least one room is joined. + """ + import json + import urllib.request + + openclaw_cfg = self._openclaw_cfg or {} + matrix_cfg = openclaw_cfg.get("channels", {}).get("matrix", {}) + homeserver = matrix_cfg.get("homeserver", "") + access_token = matrix_cfg.get("accessToken", "") + + if not homeserver or not access_token: + logger.warning( + "cannot poll joined_rooms: missing homeserver or token worker=%s", + self.worker_name, + ) + return + + from .bridge import _port_remap, _is_in_container + homeserver = _port_remap(homeserver, _is_in_container()) + url = f"{homeserver}/_matrix/client/v3/joined_rooms" + + deadline = asyncio.get_running_loop().time() + 60 + while True: + try: + req = urllib.request.Request( + url, + headers={"Authorization": f"Bearer {access_token}"}, + method="GET", + ) + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read().decode()) + rooms = data.get("joined_rooms", []) + if rooms: + self._matrix_ready_marker.write_text("ready\\n") + logger.info( + "matrix channel ready: %d joined rooms worker=%s", + len(rooms), + self.worker_name, + ) + return + except Exception as exc: + logger.debug("joined_rooms poll failed: %s", exc) + + if asyncio.get_running_loop().time() >= deadline: + logger.warning( + "matrix channel not ready within 60s worker=%s", + self.worker_name, + ) + return + await asyncio.sleep(2) + async def _run_copaw(self) -> None: """Start CoPaw via FastAPI app (includes runner + channels + web console).""" import uvicorn @@ -457,6 +519,10 @@ async def _run_copaw(self) -> None: self._mark_copaw_startup_health(), name=f"copaw-worker-{self.worker_name}-startup-health", ) + matrix_ready_task = asyncio.create_task( + self._poll_matrix_joined_rooms(), + name=f"copaw-worker-{self.worker_name}-matrix-ready", + ) await server.serve() if not server.should_exit and self._health is not None: self._health.update( diff --git a/tests/lib/test-helpers.sh b/tests/lib/test-helpers.sh index 40f8c2635..94c92d883 100755 --- a/tests/lib/test-helpers.sh +++ b/tests/lib/test-helpers.sh @@ -527,6 +527,11 @@ while True: last_error = exc if time.monotonic() >= deadline: print(str(last_error), file=sys.stderr) + print("=== DEBUG: last readyz before timeout ===", file=sys.stderr) + try: + print(json.dumps(ready, ensure_ascii=False, indent=2), file=sys.stderr) + except Exception: + print("(unable to serialize last ready state)", file=sys.stderr) sys.exit(10) time.sleep(2) @@ -737,7 +742,7 @@ dump_diagnostics() { worker) local container="hiclaw-worker-${name}" printf "\n--- docker logs %s (last 100 lines) ---\n" "${container}" - docker logs --tail 100 "${container}" 2>&1 || true + docker logs --tail 200 "${container}" 2>&1 || true printf "\n--- container state: %s ---\n" "${container}" docker inspect --format='status={{.State.Status}} exit={{.State.ExitCode}} oom={{.State.OOMKilled}} restarts={{.RestartCount}} startedAt={{.State.StartedAt}} finishedAt={{.State.FinishedAt}} error={{.State.Error}}' "${container}" 2>&1 || true printf "\n--- controller logs (recent, filtered for %s) ---\n" "${name}"