diff --git a/BLOG.md b/BLOG.md new file mode 100644 index 0000000..3cd64ea --- /dev/null +++ b/BLOG.md @@ -0,0 +1,277 @@ +# 从配置到会话:如何针对 OpenClaw 的架构配置与 Skill 供应链实施静态检查和运行时安全监控 + +过去一段时间,我一边补 OpenClaw 的可视化运维能力,一边越来越明显地感受到一个问题:对 AI Agent 系统来说,安全风险从来不只存在于“模型回答得对不对”,更深的风险,往往藏在配置、技能、变量、会话和运行路径里。 + +如果把 OpenClaw 看成一个面向多消息通道的 Agent 运行平台,那么它的安全问题天然横跨几个层面:`openclaw.json` 决定了网关暴露方式、认证方式和插件边界;workspace 中的 `TOOLS.md`、`AGENTS.md`、`SKILL.md` 决定了 agent 的能力边界;变量和模板系统决定了敏感配置如何注入;而 session 则是真正让这些能力在运行时被触发的入口。 + +这也是为什么,这次在 `claw-agent-dashboard` 里补的安全能力,并没有走成一个“安全评分面板”,而是逐步演进成一个更贴近 OpenClaw 架构的只读安全审计中心:先理解架构,识别安全触点,再把静态检查和运行时监控串起来。 + +## 一、这条演进线是怎么形成的 + +回看最近几次提交,安全能力并不是凭空长出来的,而是伴随着 Dashboard 对 OpenClaw 管理深度的增加,被一步步逼出来的。 + +第一阶段,是把 OpenClaw 的“控制面”显性化。 + +- `9bb5999` 引入了 Variables 与 Template Layer。变量开始支持 `text` 和 `secret` 两种类型,并且在 UI 和 API 中做了 masking。这一步很关键,因为它第一次把“配置是如何进入 agent 的”这件事变成了平台能力,而一旦变量进入平台,安全检查就必须考虑敏感信息暴露、作用域覆盖和模板渲染链路。 +- `2c243dc` 引入了 Blueprint 与继承式 Agent 创建能力,并通过 `openclaw_service.py` 直接修改 `openclaw.json`、注册 agent、追加 binding。到了这里,Dashboard 已经不只是“看 OpenClaw”,而是在“改 OpenClaw”。一旦进入这个阶段,`openclaw.json` 本身就必须成为安全审计对象。 + +第二阶段,是把 OpenClaw 的“运行面”接进来。 + +- `ade747f` 为 session 增加了 compose 能力,支持 raw 模式和 envelope 模式,并可直接从 session 详情页发消息。这意味着 Dashboard 已经不只是浏览历史会话,而是在模拟真实通道输入。 +- 这一步直接改变了威胁模型。因为安全风险不再只是 workspace 中写了什么,而是“某条对话里到底触发了什么”。插件安装、skill 安装、绕过告警、请求读取敏感文件、暴露 token,这些都更可能出现在 session 中,而不是配置文件里。 + +第三阶段,才是安全能力本身落地。 + +- `541c94d` 引入了只读 Security Center,把 `openclaw.json`、workspace 内容、全局 skills、变量、session 和外部情报源统一汇总到一个安全审计视图里。 + +这条演进线本身就说明了一件事:Agent 系统的安全治理,不应该从“给模型加几条规则”开始,而应该从架构控制点和运行时行为同时入手。 + +## 二、OpenClaw 架构里真正需要关注的安全触点 + +从这套实现里看,OpenClaw 至少有四类安全触点值得长期关注。 + +### 1. 配置控制面:`openclaw.json` 决定了系统边界 + +`openclaw.json` 并不是普通配置文件,它实际上定义了 OpenClaw 的外部暴露和能力边界,包括: + +- gateway 的 `bind` 方式,决定服务是否只在本地可见,还是暴露到 LAN、tailnet 或自定义网络边界; +- gateway 的 `auth` 方式,决定访问是否需要 token、password,或者处于 `none` 等高风险模式; +- `plugins.allow` 之类的插件配置,决定 agent 运行时能否获得过宽的外部扩展能力; +- `bindings` 和 `agents.list`,决定哪些 workspace 被真实接入到了运行体系。 + +这也是为什么安全审计的第一步不是正则扫文本,而是先把 `openclaw.json` 当成一个架构对象去解析。当前实现里,会先做 JSONC comment strip,再进行结构化解析,并提炼出诸如 `GATEWAY_AUTH_MODE_NONE`、`GATEWAY_BIND_LAN`、`OPENCLAW_PLUGINS_ALLOW_WILDCARD` 这类 recommendation。这里的思路不是判定“漏洞”,而是把架构上的高风险配置显性化。 + +### 2. Skill 与工作区内容面:能力定义本身就是攻击面 + +OpenClaw 的核心价值之一,是通过 workspace 文件和 skills 定义 agent 行为。但恰恰因为这种能力是文本化、可组合和可扩展的,它天然带来两个风险: + +- 能力越强,越可能越界,例如 `sudo`、`rm -rf`、`eval()`、子进程调用、包安装命令; +- 来源越多,越可能引入供应链问题,例如全局技能、共享技能、第三方 skill、npm 依赖和外部仓库。 + +因此,静态检查不能只看主 workspace,还要覆盖: + +- agent 根目录下的 `TOOLS.md`、`AGENTS.md`; +- `skills/*/SKILL.md`; +- shared/global skills 目录; +- skill 自身 `package.json` 中的 npm 依赖。 + +这部分实现用了一个很朴素但有效的办法:先做启发式内容扫描,把明显高风险能力信号打出来;再对 skill 来源做补充审计,包括从 `SKILL.md` 中提取 GitHub/npm/homepage 链接、对关键文件做 SHA-256 后接 VirusTotal、对 npm 依赖批量接 OSV 查询已知漏洞。 + +这背后的判断很简单:Agent 的 skills 本质上已经是一条供应链,安全检查也应该按照供应链的方式来做。 + +### 3. 变量与模板面:敏感信息不应该在审计中“二次泄露” + +变量系统带来了另一个典型问题:一旦平台开始集中管理 secret,它既是治理能力,也会变成新的泄露面。 + +这次实现里有两个选择我觉得很重要: + +- 对 secret 类型变量统一遮罩,避免安全审计本身成为敏感信息展示页面; +- 对 `openclaw.json` 的 secret-like 字段做路径级统计,而不是展示明文值,只报告 `secretRef` 和 inline secret 的数量与路径。 + +这意味着安全中心强调的是“存在风险的结构”,不是“把秘密展示出来给人看”。这类设计对平台型安全能力很重要,因为很多时候,审计系统最先需要防的就是自己。 + +### 4. Session 运行面:真正的风险往往在这里发生 + +如果说配置和 skill 定义了能力边界,那么 session 才是攻击真正发生的地方。 + +在 OpenClaw 里,session 不只是聊天记录,它其实包含了: + +- 用户输入; +- agent 输出; +- tool call; +- tool result; +- thinking; +- 以及不同通道语义包装后的上下文。 + +一旦 Dashboard 自身支持 session compose,尤其是 envelope 模式后,平台已经有能力模拟来自真实通道的输入。这也意味着以下风险开始变得非常现实: + +- prompt injection 和 system prompt leakage 请求; +- 引导安装 plugin、skill、外部集成; +- 请求绕过 warning、signature、verification; +- 敏感文件读取,如 `.ssh`、`.aws`、`/etc/shadow`; +- secret、token、API key 在会话中直接暴露; +- `curl | sh`、全局 npm 安装、提权类命令等高风险操作建议。 + +所以,如果只做静态检查,不看 session,最终会错过最真实的攻击样本和最关键的运行时证据。 + +## 三、静态检查应该怎么落地 + +如果把这次实现抽象成一套可复用方案,我会把静态检查拆成四层。 + +### 第一层,结构化配置审计 + +不要把 `openclaw.json` 当普通文本 grep。更合适的方式是: + +1. 解析成结构化对象,允许 JSONC 风格注释; +2. 只抽取安全相关字段; +3. 生成 recommendation,而不是直接给“通过/失败”结论。 + +这种方式的好处是可解释。比如看到 `bind=lan` 时,我们不必武断地说这是漏洞,而是明确告诉使用者:“你把边界从 localhost 推到了局域网,接下来认证模式和反向代理策略就需要一起看。” + +### 第二层,能力文件启发式扫描 + +对 `TOOLS.md`、`AGENTS.md` 和 `SKILL.md` 这类文件,更适合做 explainable heuristic scan,而不是一上来做复杂模型推断。因为平台治理更需要低成本、高透明度和可维护。 + +这里可以优先扫描几类模式: + +- 危险命令:`rm -rf`、`chmod 777`、`sudo`; +- 动态执行:`eval()`、`subprocess.run`、`child_process`; +- 在线安装:`pip install`、`npm install`; +- 外部下载与执行:`curl`、`wget`; +- 缺失基础控制文件,例如 workspace 根目录不存在 `TOOLS.md`。 + +这些信号并不等价于漏洞,但足以帮助平台操作者快速缩小审查范围。 + +### 第三层,Skill 供应链审计 + +很多 Agent 安全讨论停留在 prompt 和权限边界,但对 OpenClaw 这类以 skills 扩展能力的平台来说,skill 供应链更值得单独拉出来看。 + +我会建议至少做三件事: + +1. 建立 skill 资产清单。 +2. 建立来源清单。 +3. 建立依赖漏洞清单。 + +具体到实现上,就是: + +- 枚举 agent local skills、shared skills、global skills; +- 尝试从 `SKILL.md` 抽取仓库和包源链接; +- 对关键文件做哈希,并接入 VirusTotal 这类文件情报; +- 对 `package.json` 的直接依赖做 OSV 批量查询; +- 将结果映射回具体 skill,而不是只输出一个总表。 + +这样做之后,安全团队和平台团队才有机会回答几个真正有价值的问题:这个 skill 从哪里来?它现在被谁使用?它的依赖里有没有已知高危问题?如果出问题,影响范围多大? + +### 第四层,敏感信息结构检查 + +Secret 检查最好避免“扫描明文值”这件事本身,而应优先检查结构和治理状态,例如: + +- 是否存在 inline secret; +- 是否支持 secret reference; +- secret 变量的数量、作用域和覆盖关系; +- 哪些模板和 blueprint 实际引用了这些变量。 + +这样既能满足治理需要,也能降低审计过程本身带来的暴露风险。 + +## 四、为什么还需要运行时安全监控 + +静态检查解决的是“系统被如何定义”,运行时监控解决的是“系统实际上做了什么”。对 Agent 平台来说,后者往往更接近真实风险。 + +这次实现里,session 风险识别采用了一个分层策略,我认为比较适合在工程实践中推广。 + +### 1. 优先接平台已有会话接口,而不是重复造解析链路 + +当前实现会优先走 status service 去读取 session 和 message;只有拿不到时,才回退去直接扫描 `.jsonl` transcript。这样做的价值是: + +- 与现有展示链路保持一致; +- 避免为安全功能单独维护一套 transcript 解析标准; +- 当 OpenClaw transcript 结构变化时,受影响面更可控。 + +### 2. 先做规则识别,再做轻量推断 + +运行时风险检测用了两层机制: + +- 一层是明确规则,比如 `curl|sh`、插件安装、skill 安装、读取 `.ssh`、暴露 token; +- 一层是轻量嵌入式“风险模型”,基于特征组合做打分,例如安装动作、插件目标、外部源、提权命令、绕过告警、secret assignment 等。 + +我很喜欢这种组合方式。因为单纯靠规则,会漏掉变体;单纯靠模型,又很难解释为什么判成高风险。把两者结合后,既有确定性,又保留了一点泛化能力。 + +### 3. 输出证据片段,而不是只给结论 + +一个好用的运行时监控结果,至少应该带上: + +- agent 名称; +- session 文件; +- role; +- 时间戳; +- 风险规则; +- 风险分数; +- 命中的 evidence context; +- 模型提取出的 signals。 + +这比“某条会话存在风险”要有用得多。因为安全治理的下一个动作通常不是打分,而是复盘、定位和处置。 + +### 4. 监控重点放在“越界意图” + +对 Agent 平台来说,真正值得关注的不是所有异常文本,而是那些试图改变系统边界的请求,例如: + +- 要求忽略系统提示词或绕过策略; +- 要求安装新的插件或 skill; +- 要求读取敏感文件或导出 secret; +- 要求执行远程脚本、提权或做全局安装。 + +这类信号本质上是在尝试扩大 agent 的 agency,因此比普通的“危险词出现”更值得优先处理。 + +## 五、一个更适合 OpenClaw 的实施路径 + +如果要把这套能力真正用于生产环境,我会建议按照下面四步推进。 + +### 第一步,只读盘点,先建立事实基线 + +先不要上阻断,也不要急着做“自动处置”。最先要做的是把安全事实盘出来: + +- OpenClaw 配置现状; +- gateway 暴露与认证现状; +- skills 资产与来源现状; +- secret 使用方式现状; +- session 中已经出现过哪些高风险行为模式。 + +这一步的目标不是解决全部问题,而是避免在缺乏可见性的情况下做抽象安全讨论。 + +### 第二步,把静态检查嵌入变更流程 + +当基线比较稳定后,可以把静态检查前移到几个关键点: + +- blueprint 发布前; +- skill 引入或升级前; +- `openclaw.json` 变更前; +- 变量模板渲染和同步前。 + +此时不一定要强阻断,但至少要做到: + +- 有 diff; +- 有风险提示; +- 有影响面分析; +- 有审阅责任人。 + +### 第三步,把 session 监控变成持续运营能力 + +运行时监控的价值不在“偶尔扫一次”,而在于持续发现新的风险模式。因此建议: + +- 对高风险 session 信号做定时扫描; +- 建立 rule 和 signal 的白名单/例外机制; +- 把结果汇总到 Security Center 或告警系统; +- 对新出现的模式反哺静态规则库。 + +对于 Agent 平台而言,很多安全知识并不会先出现在文档中,而是先出现在真实会话里。 + +### 第四步,建立从风险发现到治理闭环的连接 + +真正有价值的不是“又多了一块安全页面”,而是把发现和治理连起来,例如: + +- 发现某类 plugin/skill 安装请求频繁出现,就回头收紧 allowlist; +- 发现 session 中频繁出现 secret 暴露,就回头优化变量注入和 masking 设计; +- 发现某个 skill 依赖反复命中 OSV,就推动升级或下线; +- 发现某类 workspace 缺少 `TOOLS.md`,就把它纳入创建和派生流程的必填校验。 + +安全能力只有回到平台设计本身,才不会停留在报告里。 + +## 六、这次实现里我最看重的几个设计取舍 + +回过头看,这套实现未必复杂,但有几个取舍我认为很重要。 + +第一,它坚持只读优先。安全中心当前不直接修改 OpenClaw 运行状态,这降低了“安全能力本身成为破坏面”的风险,也更适合在真实环境中先落地。 + +第二,它强调结构化和可解释。无论是配置 recommendation、content heuristic,还是 session risk evidence,输出都尽量保留上下文,而不是给黑盒分数。 + +第三,它把安全问题放回了 OpenClaw 的架构事实上,而不是孤立地讨论 LLM 风险。真正的风险并不只是 prompt injection,也包括网关暴露、插件放权、skill 供应链、secret 管理和运行时越界行为。 + +第四,它把 session 当成了一等安全数据源。对于 Agent 系统,这是非常关键的观念转变。很多攻击不是“写在配置里”的,而是“发生在对话里”的。 + +## 七、结语 + +如果只把 OpenClaw 当成一个聊天机器人平台,那么安全治理很容易被收缩成提示词、越狱和输出审查。但当我们把它看成一个真正的 Agent 运行架构,就会发现安全问题分布在配置、能力、供应链、变量和会话的整条链路上。 + +这也是我这轮改造最核心的收获:对 Agent 平台做安全,不能只盯着模型,而要同时看“系统如何被配置”和“系统如何被使用”;不能只做静态规则,也要理解真实会话中的运行时意图;不能只找漏洞,还要建立面向运营的可见性。 + +对于 OpenClaw 这样的系统,静态检查解决的是边界定义,运行时监控解决的是边界是否正在被突破。只有两者结合,安全治理才真正开始具备工程落地的可能性。 diff --git a/backend/app/main.py b/backend/app/main.py index f3f647a..bbe1636 100755 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -8,7 +8,7 @@ from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse -from .routers import agents, translate, settings, global_skills, status, versions, variables, templates, blueprints, agent_changes, search +from .routers import agents, translate, settings, global_skills, status, versions, variables, templates, blueprints, agent_changes, search, security from .services import version_db # CORS origins — defaults to ["*"] for development. @@ -49,6 +49,7 @@ async def lifespan(app: FastAPI): app.include_router(blueprints.router, prefix="/api") app.include_router(agent_changes.router, prefix="/api") app.include_router(search.router, prefix="/api/search") +app.include_router(security.router, prefix="/api") # Serve frontend static files (production build) STATIC_DIR = Path(__file__).resolve().parent.parent.parent / "static" diff --git a/backend/app/routers/security.py b/backend/app/routers/security.py new file mode 100644 index 0000000..c58b922 --- /dev/null +++ b/backend/app/routers/security.py @@ -0,0 +1,12 @@ +"""Read-only security audit API.""" +from fastapi import APIRouter + +from ..services.security_audit import build_audit_report + +router = APIRouter(tags=["security"]) + + +@router.get("/security/audit") +async def get_security_audit(): + """Aggregated read-only report: agents, skills, variables, env hints.""" + return await build_audit_report() diff --git a/backend/app/services/security_audit.py b/backend/app/services/security_audit.py new file mode 100644 index 0000000..63b7477 --- /dev/null +++ b/backend/app/services/security_audit.py @@ -0,0 +1,1595 @@ +"""Read-only security audit — aggregates agents, skills, variables, and env hints.""" +from __future__ import annotations + +import asyncio +import hashlib +import json +import os +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import httpx + +# Heuristic patterns in workspace markdown (TOOLS.md, AGENTS.md, skills/*/SKILL.md). +# Severity is informational — review context manually. +_CONTENT_RULES: tuple[tuple[re.Pattern[str], str, str], ...] = ( + (re.compile(r"\bcurl\b", re.I), "CURL", "info"), + (re.compile(r"\bwget\b", re.I), "WGET", "info"), + (re.compile(r"\brm\s+-rf\b"), "RM_RF", "warning"), + (re.compile(r"\bchmod\s+777\b"), "CHMOD_777", "info"), + (re.compile(r"\bsudo\b"), "SUDO", "info"), + (re.compile(r"eval\s*\("), "EVAL_CALL", "warning"), + (re.compile(r"subprocess\.(?:call|run|Popen)", re.I), "PYTHON_SUBPROCESS", "info"), + (re.compile(r"child_process|execSync|spawn\s*\(", re.I), "NODE_CHILD_PROCESS", "info"), + (re.compile(r"\bpip3?\s+install\b", re.I), "PIP_INSTALL", "info"), + (re.compile(r"\bnpm\s+(?:install|i)\b", re.I), "NPM_INSTALL", "info"), +) + +_MAX_SCAN_BYTES = 512_000 +_MAX_SCAN_LINES = 8_000 +_MAX_HITS_PER_FILE = 14 +_MAX_TOTAL_SIGNALS = 400 +_MAX_VT_FILES = 80 +_MAX_OSV_DIRECT_DEPS = 60 +_MAX_OSV_QUERY_PACKAGES = 180 +_MAX_OPENCLAW_RAW_PREVIEW_BYTES = 200_000 +_HTTP_TIMEOUT_SECONDS = 8.0 +_SECRET_KEYWORDS = ("secret", "token", "password", "api_key", "apikey", "key") +_SECRET_REF_KEYS = ("secretRef", "keyRef", "tokenRef", "passwordRef", "valueFrom") +_MAX_SESSION_FILES = 24 +_MAX_SESSION_LINES_PER_FILE = 800 +_MAX_SESSION_RISK_HITS = 120 +_SESSION_RISK_CONTEXT_RADIUS = 120 + +_SESSION_RISK_RULES: tuple[tuple[re.Pattern[str], str, str], ...] = ( + (re.compile(r"\b(sudo|su\s+-|doas)\b", re.I), "SESSION_PRIVILEGE_ESCALATION", "warning"), + (re.compile(r"\b(chmod\s+777|chown\s+root|setcap\s+)\b", re.I), "SESSION_PRIVILEGE_ESCALATION", "warning"), + (re.compile(r"\b(curl|wget).+\|\s*(sh|bash|zsh)\b", re.I), "SESSION_REMOTE_EXEC_PIPE", "warning"), + (re.compile(r"\bnpm\s+(?:install|i)\s+(-g|--global)\b", re.I), "SESSION_INSTALL_GLOBAL_PACKAGE", "warning"), + (re.compile(r"\b(pnpm|yarn|npm)\s+(?:add|install|i)\b", re.I), "SESSION_INSTALL_PACKAGE", "info"), + (re.compile(r"\b(?:openclaw\s+skill\s+install|skill\s+install)\b", re.I), "SESSION_INSTALL_SKILL", "warning"), + (re.compile(r"\b(?:openclaw\s+plugin\s+install|plugin\s+install)\b", re.I), "SESSION_INSTALL_PLUGIN", "warning"), + (re.compile(r"\bclawhub\s+install\b", re.I), "SESSION_INSTALL_SKILL", "warning"), + (re.compile(r"\bclawhub\s+install\s+github\b", re.I), "SESSION_EXTERNAL_INTEGRATION_INSTALL", "warning"), + (re.compile(r"\b(set\s+up|install|enable)\b.{0,48}\b(plugin|composio|openclaw-plugin)\b", re.I), "SESSION_PLUGIN_SETUP_REQUEST", "warning"), + (re.compile(r"\b(ignore|bypass|disable)\b.{0,48}\b(warning|mismatch|verification|signature)\b", re.I), "SESSION_WARNING_BYPASS_REQUEST", "warning"), + (re.compile(r"\bhttps?://[^\s]*registry\.npmjs\.org/[^\s]+", re.I), "SESSION_EXTERNAL_PACKAGE_INSTRUCTION", "info"), + # OWASP LLM01 - Prompt Injection / Jailbreak patterns + (re.compile(r"\b(jailbreak|jail-broken|override|disregard|bypass|ignore)\b.{0,80}\b(system\s+prompt|developer\s+message|instructions?|policy|safety|rules?|restrictions?)\b", re.I), "SESSION_PROMPT_INJECTION_ATTEMPT", "warning"), + (re.compile(r"\b(reveal|show|print|leak|expose)\b.{0,80}\b(system\s+prompt|developer\s+message)\b", re.I), "SESSION_SYSTEM_PROMPT_LEAKAGE_REQUEST", "warning"), + (re.compile(r"\b(ignore|bypass|disable)\b.{0,80}\b(instruction|policy|safety|rules?)\b", re.I), "SESSION_PROMPT_INJECTION_ATTEMPT", "warning"), + # OWASP LLM02 - Sensitive Information Disclosure / Credential exfiltration + (re.compile(r"\b(begin\s+rsa\s+private\s+key|begin\s+openssh\s+private\s+key|private\s+key)\b", re.I), "SESSION_SENSITIVE_KEY_MATERIAL", "warning"), + (re.compile(r"\b(authorization|bearer)\b\s*[:=]\s*['\"]?[A-Za-z0-9\-_\.]{8,}", re.I), "SESSION_SENSITIVE_AUTH_HEADER", "warning"), + (re.compile(r"\b(exfiltrat|exfiltrate|exfil)\b.{0,80}\b(http|https|upload|send)\b", re.I), "SESSION_DATA_EXFILTRATION_ATTEMPT", "warning"), + # OWASP LLM06 - Excessive Agency / Sensitive file read + (re.compile(r"\b(~\/\.ssh|\.ssh\/|~\/\.aws|\.aws\/|\/etc\/(passwd|shadow)|\/root\/|\/home\/[^/ ]+\/\.ssh)\b", re.I), "SESSION_SENSITIVE_FILE_READ_REQUEST", "warning"), + (re.compile(r"\b(read|cat|open|dump)\b.{0,40}\b(~\/\.ssh|\.ssh\/|\.aws\/|\/etc\/(passwd|shadow)|/root/)\b", re.I), "SESSION_SENSITIVE_FILE_READ_REQUEST", "warning"), + (re.compile(r"\bck__[A-Za-z0-9_-]{10,}\b"), "SESSION_SECRET_EXPOSURE", "warning"), + (re.compile(r"\b(sk-[A-Za-z0-9_-]{20,}|AIza[0-9A-Za-z_-]{20,})\b"), "SESSION_SECRET_EXPOSURE", "warning"), + (re.compile(r"\b(api[_-]?key|appsecret|token|password|secret)\b.{0,24}[:=]\s*['\"]?[A-Za-z0-9_\-./+=]{8,}", re.I), "SESSION_SECRET_EXPOSURE", "warning"), +) + +# Lightweight embedded "risk model" (feature scoring, no external LLM dependency). +# This generalizes beyond single-keyword exact matches. +_SESSION_MODEL_FEATURES: dict[str, tuple[re.Pattern[str], int]] = { + "install_action": (re.compile(r"\b(install|setup|set up|enable|add)\b", re.I), 2), + "plugin_skill_target": (re.compile(r"\b(plugin|skill|clawhub|composio|npm|pnpm|yarn)\b", re.I), 2), + "external_pkg_source": (re.compile(r"\b(registry\.npmjs\.org|github\.com|gitlab\.com)\b", re.I), 2), + "privilege_cmd": (re.compile(r"\b(sudo|su\s+-|doas|setcap|chown\s+root|chmod\s+777)\b", re.I), 4), + "remote_exec_pipe": (re.compile(r"\b(curl|wget).+\|\s*(sh|bash|zsh)\b", re.I), 5), + "warning_bypass": (re.compile(r"\b(ignore|bypass|disable|skip)\b.{0,48}\b(warning|verify|verification|mismatch|signature)\b", re.I), 4), + "secret_literal": (re.compile(r"\b(sk-[A-Za-z0-9_-]{16,}|ck__[A-Za-z0-9_-]{10,}|AIza[0-9A-Za-z_-]{20,})\b"), 5), + "secret_assignment": (re.compile(r"\b(api[_-]?key|appsecret|token|password|secret)\b.{0,24}[:=]\s*['\"]?[A-Za-z0-9_\-./+=]{8,}", re.I), 4), + "sensitive_share": (re.compile(r"\b(my|here is|is:|provide|consumer)\b.{0,24}\b(api key|token|secret|password)\b", re.I), 2), +} + +from ..config import ( + AGENTS_DIR, + DATA_DIR, + GATEWAY_TOKEN, + GATEWAY_URL, + GLOBAL_SKILLS_DIR, + OPENCLAW_CONFIG_PATH, + SESSION_DATA_DIR, + SHARED_SKILLS_DIR, + resolve_agent_dir, +) +from . import file_service, global_skills as gs, scanner, status as status_service, variable_service, version_db + + +def _cors_allow_all() -> tuple[bool, int | None]: + raw = os.environ.get("ALLOWED_ORIGINS", "*").strip() + if raw == "*": + return True, None + parts = [o.strip() for o in raw.split(",") if o.strip()] + return False, len(parts) + + +def _load_openclaw_json(config_path: Path) -> tuple[dict[str, Any] | None, str | None]: + """Parse openclaw.json (JSONC comments stripped). Returns (data, error_message).""" + if not config_path.exists(): + return None, None + try: + raw = config_path.read_text(encoding="utf-8", errors="replace") + clean = _strip_jsonc_comments(raw) + return json.loads(clean), None + except Exception as e: + return None, str(e)[:200] + + +def _credential_configured(value: Any) -> bool: + """True if token/password appears set (string non-empty or SecretRef dict).""" + if value is None: + return False + if isinstance(value, dict): + return True + return bool(str(value).strip()) + + +def _openclaw_security_recommendations(data: dict[str, Any]) -> list[dict[str, Any]]: + """Derive security-focused checks from parsed openclaw.json (no secret values).""" + recs: dict[str, dict[str, Any]] = {} + + gw = data.get("gateway") + if not isinstance(gw, dict): + recs["OPENCLAW_GATEWAY_BLOCK_MISSING"] = { + "id": "OPENCLAW_GATEWAY_BLOCK_MISSING", + "severity": "info", + } + return sorted(recs.values(), key=lambda x: (0 if x["severity"] == "warning" else 1, x["id"])) + + bind = gw.get("bind") + if isinstance(bind, str): + bind_norm = bind.strip().lower() + else: + bind_norm = None + + auth = gw.get("auth") + if "auth" not in gw: + recs["GATEWAY_AUTH_BLOCK_MISSING"] = { + "id": "GATEWAY_AUTH_BLOCK_MISSING", + "severity": "warning", + } + auth = {} + elif not isinstance(auth, dict): + auth = {} + else: + if len(auth) == 0: + recs["GATEWAY_AUTH_EMPTY"] = { + "id": "GATEWAY_AUTH_EMPTY", + "severity": "warning", + } + + mode = auth.get("mode") if isinstance(auth, dict) else None + if isinstance(mode, str): + mode_norm = mode.strip().lower() + else: + mode_norm = None + + token_ok = _credential_configured(auth.get("token")) if isinstance(auth, dict) else False + password_ok = _credential_configured(auth.get("password")) if isinstance(auth, dict) else False + + if mode_norm == "none": + recs["GATEWAY_AUTH_MODE_NONE"] = { + "id": "GATEWAY_AUTH_MODE_NONE", + "severity": "warning", + } + if bind_norm in ("lan", "custom", "tailnet", "auto"): + recs["GATEWAY_AUTH_NONE_NON_LOOPBACK"] = { + "id": "GATEWAY_AUTH_NONE_NON_LOOPBACK", + "severity": "warning", + } + + if bind_norm == "lan": + recs["GATEWAY_BIND_LAN"] = { + "id": "GATEWAY_BIND_LAN", + "severity": "info", + } + + if token_ok and password_ok and mode_norm is None: + recs["GATEWAY_AUTH_MODE_AMBIGUOUS"] = { + "id": "GATEWAY_AUTH_MODE_AMBIGUOUS", + "severity": "warning", + } + + if mode_norm == "token" and not token_ok: + recs["GATEWAY_TOKEN_UNSET"] = { + "id": "GATEWAY_TOKEN_UNSET", + "severity": "warning", + } + + if mode_norm == "password" and not password_ok: + recs["GATEWAY_PASSWORD_UNSET"] = { + "id": "GATEWAY_PASSWORD_UNSET", + "severity": "warning", + } + + if mode_norm is None and not token_ok and not password_ok and isinstance(auth, dict) and len(auth) > 0: + # auth block present with keys but no resolved mode/credentials + recs["GATEWAY_AUTH_UNCONFIGURED"] = { + "id": "GATEWAY_AUTH_UNCONFIGURED", + "severity": "info", + } + + if mode_norm == "trusted-proxy": + recs["GATEWAY_TRUSTED_PROXY_MODE"] = { + "id": "GATEWAY_TRUSTED_PROXY_MODE", + "severity": "info", + } + + if data.get("debug") is True: + recs["OPENCLAW_DEBUG_ENABLED"] = { + "id": "OPENCLAW_DEBUG_ENABLED", + "severity": "warning", + } + + # OWASP LLM06 - Excessive Agency / Unbounded tool/plugin capabilities + plugins = data.get("plugins") + if isinstance(plugins, dict): + allow = plugins.get("allow") + is_empty_allow = False + is_wildcard_allow = False + if allow is None: + pass + elif isinstance(allow, list) and len(allow) == 0: + is_empty_allow = True + elif isinstance(allow, dict) and len(allow) == 0: + is_empty_allow = True + elif isinstance(allow, str) and not allow.strip(): + is_empty_allow = True + elif allow == "*" or (isinstance(allow, str) and allow.strip() == "*"): + is_wildcard_allow = True + if is_empty_allow: + recs["OPENCLAW_PLUGINS_ALLOW_EMPTY"] = { + "id": "OPENCLAW_PLUGINS_ALLOW_EMPTY", + "severity": "warning", + } + if is_wildcard_allow: + recs["OPENCLAW_PLUGINS_ALLOW_WILDCARD"] = { + "id": "OPENCLAW_PLUGINS_ALLOW_WILDCARD", + "severity": "warning", + } + + return sorted(recs.values(), key=lambda x: (0 if x["severity"] == "warning" else 1, x["id"])) + + +def _is_secret_like_key(key: str) -> bool: + k = key.strip().lower() + return any(part in k for part in _SECRET_KEYWORDS) + + +def _count_openclaw_secrets(data: Any) -> dict[str, Any]: + """Recursively count secret-like values/references in openclaw config (no plaintext output).""" + refs: list[str] = [] + inline: list[str] = [] + + def walk(node: Any, path: str): + if isinstance(node, dict): + lower_keys = {str(k).lower() for k in node.keys()} + # SecretRef-like object + if any(r.lower() in lower_keys for r in _SECRET_REF_KEYS): + refs.append(path or "$") + for k, v in node.items(): + key = str(k) + child_path = f"{path}.{key}" if path else key + if _is_secret_like_key(key): + # plain inline secret-ish string + if isinstance(v, str) and v.strip(): + inline.append(child_path) + # reference object under secret-ish field (still secret managed) + elif isinstance(v, dict) and any(r.lower() in {str(x).lower() for x in v.keys()} for r in _SECRET_REF_KEYS): + refs.append(child_path) + walk(v, child_path) + elif isinstance(node, list): + for idx, it in enumerate(node): + walk(it, f"{path}[{idx}]") + + walk(data, "") + # dedupe while preserving order + def uniq(items: list[str]) -> list[str]: + seen: set[str] = set() + out: list[str] = [] + for x in items: + if x in seen: + continue + seen.add(x) + out.append(x) + return out + + refs_u = uniq(refs) + inline_u = uniq(inline) + return { + "secret_ref_count": len(refs_u), + "inline_secret_like_count": len(inline_u), + "secret_ref_paths": refs_u[:60], + "inline_secret_like_paths": inline_u[:60], + } + + +def _strip_jsonc_comments(raw: str) -> str: + lines = [] + for line in raw.splitlines(): + stripped = line.lstrip() + if stripped.startswith("//"): + continue + idx = 0 + while True: + comment_idx = line.find("//", idx) + if comment_idx < 0: + break + if comment_idx > 0 and line[comment_idx - 1] == ":": + idx = comment_idx + 2 + continue + if '"' not in line[comment_idx:]: + line = line[:comment_idx] + break + lines.append(line) + return "\n".join(lines) + + +def _openclaw_preview(config_path: Path) -> dict[str, Any]: + out: dict[str, Any] = { + "path": str(config_path), + "exists": config_path.exists(), + "parsed": False, + "top_level_keys": [], + "agents_in_registry_count": 0, + "agents_registry_declared": False, + "bindings_count": 0, + "parse_error": None, + "gateway_bind": None, + "gateway_auth_mode": None, + "gateway_auth_token_configured": None, + "gateway_auth_password_configured": None, + "secrets": { + "secret_ref_count": 0, + "inline_secret_like_count": 0, + "secret_ref_paths": [], + "inline_secret_like_paths": [], + }, + "security_recommendations": [], + "raw_text": None, + "raw_text_truncated": False, + } + if not out["exists"]: + return out + try: + raw = config_path.read_text(encoding="utf-8", errors="replace") + if len(raw.encode("utf-8", errors="replace")) > _MAX_OPENCLAW_RAW_PREVIEW_BYTES: + encoded = raw.encode("utf-8", errors="replace") + clipped = encoded[:_MAX_OPENCLAW_RAW_PREVIEW_BYTES] + out["raw_text"] = clipped.decode("utf-8", errors="ignore") + out["raw_text_truncated"] = True + else: + out["raw_text"] = raw + except Exception: + out["raw_text"] = None + + data, parse_err = _load_openclaw_json(config_path) + if data is None: + out["parse_error"] = parse_err or "Failed to read or parse openclaw.json" + return out + try: + out["parsed"] = True + out["top_level_keys"] = sorted(data.keys())[:50] + agents_obj = data.get("agents") + if isinstance(agents_obj, dict) and "list" in agents_obj: + out["agents_registry_declared"] = True + agents_list = agents_obj.get("list", []) if isinstance(agents_obj, dict) else [] + if isinstance(agents_list, list): + out["agents_in_registry_count"] = len(agents_list) + bindings = data.get("bindings", []) + if isinstance(bindings, list): + out["bindings_count"] = len(bindings) + + gw = data.get("gateway") + if isinstance(gw, dict): + out["gateway_bind"] = gw.get("bind") + auth = gw.get("auth") + if isinstance(auth, dict): + out["gateway_auth_mode"] = auth.get("mode") + out["gateway_auth_token_configured"] = _credential_configured(auth.get("token")) + out["gateway_auth_password_configured"] = _credential_configured(auth.get("password")) + out["secrets"] = _count_openclaw_secrets(data) + out["security_recommendations"] = _openclaw_security_recommendations(data) + if (out["secrets"].get("inline_secret_like_count") or 0) > 0: + out["security_recommendations"].append({ + "id": "OPENCLAW_INLINE_SECRETS_DETECTED", + "severity": "warning", + }) + if (out["secrets"].get("secret_ref_count") or 0) > 0: + out["security_recommendations"].append({ + "id": "OPENCLAW_SECRET_REFS_PRESENT", + "severity": "info", + }) + out["security_recommendations"] = sorted( + out["security_recommendations"], + key=lambda x: (0 if x["severity"] == "warning" else 1, x["id"]), + ) + except Exception as e: + out["parse_error"] = str(e)[:200] + return out + + +def _count_files_in_skill_tree(items: list[dict]) -> int: + n = 0 + for it in items: + if it.get("type") == "file": + n += 1 + elif it.get("type") == "directory" and it.get("children"): + n += _count_files_in_skill_tree(it["children"]) + return n + + +async def _agent_skill_rows(agent_name: str) -> list[dict[str, Any]]: + skills = await file_service.list_agent_skills_async(agent_name) + rows = [] + for s in skills: + name = s["name"] + tree = await file_service.list_skill_files_async(agent_name, name) + rows.append({ + "name": name, + "display_name": s.get("display_name", name), + "file_count": _count_files_in_skill_tree(tree), + }) + return rows + + +async def _global_source_rows() -> list[dict[str, Any]]: + sources_out = [] + for src in gs.list_sources(): + source = src["source"] + skills = gs.list_skills(source) + skill_rows = [] + for sk in skills: + tree = gs.list_skill_files(source, sk["name"]) + skill_rows.append({ + "name": sk["name"], + "display_name": sk.get("display_name", sk["name"]), + "file_count": _count_files_in_skill_tree(tree), + }) + sources_out.append({ + "source": source, + "label": src.get("name", source), + "path": src.get("path", ""), + "skill_count": len(skill_rows), + "skills": skill_rows, + }) + return sources_out + + +def _policy_files(agent_name: str) -> dict[str, bool]: + base = Path(AGENTS_DIR) / resolve_agent_dir(agent_name) + names = ("TOOLS.md", "AGENTS.md", "BOOTSTRAP.md") + return {n: (base / n).is_file() for n in names} + + +def _scan_text_file(path: Path) -> list[dict[str, Any]]: + """Return rule hits for one file (line-level, capped).""" + hits: list[dict[str, Any]] = [] + try: + raw = path.read_text(encoding="utf-8", errors="replace") + except OSError: + return hits + if len(raw) > _MAX_SCAN_BYTES: + raw = raw[:_MAX_SCAN_BYTES] + seen: set[tuple[int, str]] = set() + for line_num, line in enumerate(raw.splitlines(), start=1): + if line_num > _MAX_SCAN_LINES: + break + if len(hits) >= _MAX_HITS_PER_FILE: + break + for pattern, rule_id, severity in _CONTENT_RULES: + if pattern.search(line): + key = (line_num, rule_id) + if key in seen: + continue + seen.add(key) + hits.append({ + "rule": rule_id, + "severity": severity, + "line": line_num, + "preview": line.strip()[:240], + }) + if len(hits) >= _MAX_HITS_PER_FILE: + break + return hits + + +def _session_jsonl_candidates(agent_name: str) -> list[Path]: + out: list[Path] = [] + seen: set[str] = set() + resolved = resolve_agent_dir(agent_name) + agent_short = resolved.replace("workspace-", "") + bases = [ + # Workspace-style session paths + Path(AGENTS_DIR) / resolved / "sessions", + Path(AGENTS_DIR) / f"workspace-{agent_short}" / "sessions", + # Agent registry-style paths (used by status service) + Path(AGENTS_DIR) / "agents" / agent_short / "sessions", + Path(AGENTS_DIR) / "agents" / resolved / "sessions", + # Host mounted session data paths + Path(SESSION_DATA_DIR) / agent_short / "sessions", + Path(SESSION_DATA_DIR) / "agents" / agent_short / "sessions", + ] + for base in bases: + if not base.is_dir(): + continue + for fp in sorted(base.glob("*.jsonl"), key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True): + if fp.name.endswith(".deleted.jsonl") or fp.name.endswith(".lock"): + continue + key = str(fp.resolve()) + if key in seen: + continue + seen.add(key) + out.append(fp) + if len(out) >= _MAX_SESSION_FILES: + return out + return out + + +def _stringify_session_message_line(line_obj: dict[str, Any]) -> tuple[str, str, str]: + msg = line_obj.get("message") or line_obj + role = str(msg.get("role") or line_obj.get("role") or "unknown") + ts = str(line_obj.get("timestamp") or msg.get("timestamp") or "") + content = msg.get("content", "") + if isinstance(content, str): + return role, ts, content + if isinstance(content, list): + parts: list[str] = [] + for block in content: + if isinstance(block, str): + parts.append(block) + elif isinstance(block, dict): + bt = str(block.get("type") or "text") + if bt == "text": + parts.append(str(block.get("text") or "")) + elif bt == "toolCall": + args = block.get("arguments") + args_text = args if isinstance(args, str) else json.dumps(args, ensure_ascii=False) + parts.append(f"[toolCall {block.get('name', 'unknown')}] {args_text}") + elif bt == "toolResult": + t = block.get("text", block.get("content", "")) + parts.append(str(t if isinstance(t, str) else json.dumps(t, ensure_ascii=False))) + elif bt == "thinking": + parts.append(str(block.get("thinking") or "")) + else: + parts.append(str(block.get("text") or json.dumps(block, ensure_ascii=False))) + return role, ts, "\n".join([p for p in parts if p]).strip() + return role, ts, str(content) + + +def _infer_session_risk_with_embedded_model(text: str) -> list[dict[str, Any]]: + """Infer risk intents using weighted feature co-occurrence (mini embedded model).""" + if not text: + return [] + scores: dict[str, int] = { + "SESSION_SECRET_EXPOSURE": 0, + "SESSION_SUPPLY_CHAIN_INSTALL": 0, + "SESSION_PRIVILEGE_ESCALATION": 0, + "SESSION_REMOTE_EXEC_PIPE": 0, + "SESSION_WARNING_BYPASS_REQUEST": 0, + } + signals: dict[str, list[str]] = {k: [] for k in scores.keys()} + + def add(code: str, n: int, sig: str): + scores[code] += n + if sig not in signals[code]: + signals[code].append(sig) + + for feat, (pattern, weight) in _SESSION_MODEL_FEATURES.items(): + m = pattern.search(text) + if not m: + continue + token = m.group(0)[:80] + if feat in {"secret_literal", "secret_assignment", "sensitive_share"}: + add("SESSION_SECRET_EXPOSURE", weight, token) + if feat in {"install_action", "plugin_skill_target", "external_pkg_source"}: + add("SESSION_SUPPLY_CHAIN_INSTALL", weight, token) + if feat == "privilege_cmd": + add("SESSION_PRIVILEGE_ESCALATION", weight, token) + if feat == "remote_exec_pipe": + add("SESSION_REMOTE_EXEC_PIPE", weight, token) + if feat == "warning_bypass": + add("SESSION_WARNING_BYPASS_REQUEST", weight, token) + + out: list[dict[str, Any]] = [] + for code, score in scores.items(): + if score <= 0: + continue + sev = "warning" if score >= 5 else "info" + out.append({ + "rule": code, + "severity": sev, + "score": score, + "signals": signals.get(code, [])[:5], + "matched_text": (signals.get(code, [""])[0] or "")[:180], + }) + # Keep strongest intents first. + out.sort(key=lambda x: (0 if x["severity"] == "warning" else 1, -x["score"], x["rule"])) + return out + + +def _detect_session_risk(text: str) -> dict[str, Any] | None: + """Blend precise regex rules + embedded feature model.""" + candidates: list[dict[str, Any]] = [] + for pattern, rule_id, severity in _SESSION_RISK_RULES: + m = pattern.search(text or "") + if not m: + continue + candidates.append({ + "rule": rule_id, + "severity": severity, + "score": 10 if severity == "warning" else 6, + "matched_text": m.group(0)[:180], + "signals": [m.group(0)[:80]], + }) + candidates.extend(_infer_session_risk_with_embedded_model(text or "")) + if not candidates: + return None + candidates.sort(key=lambda x: (0 if x["severity"] == "warning" else 1, -int(x.get("score", 0)), x["rule"])) + return candidates[0] + + +def _scan_session_risks(agent_names: list[str]) -> dict[str, Any]: + hits: list[dict[str, Any]] = [] + scanned_files = 0 + for agent_name in agent_names: + # Align with Agents page session source: + # 1) read sessions via status service + # 2) read transcript via status session messages API path + resolved_agent = resolve_agent_dir(agent_name) + if agent_name == "workspace-main" or resolved_agent == "workspace": + agent_short = "main" + elif resolved_agent.startswith("workspace-"): + agent_short = resolved_agent.replace("workspace-", "", 1) + else: + agent_short = resolved_agent + status_agent_candidates = [ + resolved_agent, + f"agents/{agent_short}", + agent_short, + agent_name, + ] + session_rows: list[dict[str, Any]] = [] + chosen_status_agent = None + for cand in status_agent_candidates: + try: + st = status_service.get_agent_status(cand) + rows = st.get("sessions") or [] + if rows: + session_rows = rows + chosen_status_agent = cand + break + except Exception: + continue + + scanned_with_status = False + for sess in session_rows: + sid = str(sess.get("session_id") or "").strip() + if not sid: + continue + scanned_files += 1 + scanned_with_status = True + try: + msg_page = status_service.get_session_messages(chosen_status_agent or resolved_agent, sid, offset=0, limit=200) + msgs = msg_page.get("messages") or [] + except Exception: + msgs = [] + for msg in msgs: + if len(hits) >= _MAX_SESSION_RISK_HITS: + break + # status service returns parsed content blocks + parts: list[str] = [] + for b in (msg.get("content") or []): + bt = str((b or {}).get("type") or "") + if bt == "text": + parts.append(str((b or {}).get("text") or "")) + elif bt == "toolCall": + parts.append(f"[toolCall {(b or {}).get('name', 'unknown')}] {str((b or {}).get('arguments') or '')}") + elif bt == "toolResult": + parts.append(str((b or {}).get("text") or "")) + elif bt == "thinking": + parts.append(str((b or {}).get("text") or "")) + text = "\n".join([p for p in parts if p]).strip() + if not text: + continue + role = str(msg.get("role") or "unknown") + ts = str(msg.get("timestamp") or "") + detected = _detect_session_risk(text) + if not detected: + continue + needle = detected.get("matched_text") or "" + idx = text.lower().find(str(needle).lower()) if needle else -1 + if idx < 0: + idx = 0 + st = max(0, idx - _SESSION_RISK_CONTEXT_RADIUS) + ed = min(len(text), idx + max(len(str(needle)), 1) + _SESSION_RISK_CONTEXT_RADIUS) + snippet = text[st:ed].strip().replace("\r\n", "\n") + hits.append({ + "severity": detected["severity"], + "rule": detected["rule"], + "risk_score": detected.get("score", 0), + "model_signals": detected.get("signals", []), + "agent_name": agent_name, + "session_file": f"{sid}.jsonl", + "timestamp": ts or None, + "role": role, + "matched_text": str(needle)[:180], + "context": snippet[:900], + }) + + # Fallback: direct jsonl scan if status pipeline finds nothing + if scanned_with_status: + continue + for fp in _session_jsonl_candidates(agent_name): + if len(hits) >= _MAX_SESSION_RISK_HITS: + break + scanned_files += 1 + try: + with fp.open("r", encoding="utf-8", errors="replace") as f: + lines = f.readlines() + except OSError: + continue + if len(lines) > _MAX_SESSION_LINES_PER_FILE: + lines = lines[-_MAX_SESSION_LINES_PER_FILE:] + for raw in lines: + if len(hits) >= _MAX_SESSION_RISK_HITS: + break + s = raw.strip() + if not s: + continue + try: + entry = json.loads(s) + except Exception: + continue + role, ts, text = _stringify_session_message_line(entry) + if not text: + continue + detected = _detect_session_risk(text) + if not detected: + continue + needle = detected.get("matched_text") or "" + idx = text.lower().find(str(needle).lower()) if needle else -1 + if idx < 0: + idx = 0 + st = max(0, idx - _SESSION_RISK_CONTEXT_RADIUS) + ed = min(len(text), idx + max(len(str(needle)), 1) + _SESSION_RISK_CONTEXT_RADIUS) + snippet = text[st:ed].strip().replace("\r\n", "\n") + hits.append({ + "severity": detected["severity"], + "rule": detected["rule"], + "risk_score": detected.get("score", 0), + "model_signals": detected.get("signals", []), + "agent_name": agent_name, + "session_file": fp.name, + "timestamp": ts or None, + "role": role, + "matched_text": str(needle)[:180], + "context": snippet[:900], + }) + warning = sum(1 for h in hits if h["severity"] == "warning") + info = sum(1 for h in hits if h["severity"] != "warning") + return { + "scanned_files": scanned_files, + "total_hits": len(hits), + "warning_hits": warning, + "info_hits": info, + "hits": hits, + } + + +def _scan_agent_content_files(agent_name: str) -> tuple[list[dict[str, Any]], bool]: + """Scan TOOLS.md, AGENTS.md, skills/*/SKILL.md. Returns (signals, missing_tools_md).""" + base = Path(AGENTS_DIR) / resolve_agent_dir(agent_name) + signals: list[dict[str, Any]] = [] + missing_tools = not (base / "TOOLS.md").is_file() + + files_to_scan: list[tuple[Path, str]] = [] + for n in ("TOOLS.md", "AGENTS.md"): + p = base / n + if p.is_file(): + files_to_scan.append((p, n)) + + skills_dir = base / "skills" + if skills_dir.is_dir(): + for sd in sorted(skills_dir.iterdir()): + if not sd.is_dir(): + continue + sm = sd / "SKILL.md" + if sm.is_file(): + files_to_scan.append((sm, f"skills/{sd.name}/SKILL.md")) + + for fp, rel in files_to_scan: + for h in _scan_text_file(fp): + signals.append({ + "scope": "agent", + "agent_name": agent_name, + "global_source": None, + "path": rel, + **h, + }) + return signals, missing_tools + + +def _global_skill_skill_md_path(source: str, skill_name: str) -> Path | None: + if source == "shared": + p = Path(SHARED_SKILLS_DIR) / skill_name / "SKILL.md" + elif source == "global": + p = Path(GLOBAL_SKILLS_DIR) / skill_name / "SKILL.md" + else: + return None + return p if p.is_file() else None + + +def _scan_global_skill_content() -> list[dict[str, Any]]: + out: list[dict[str, Any]] = [] + for src in gs.list_sources(): + source = src["source"] + for sk in gs.list_skills(source): + name = sk["name"] + fp = _global_skill_skill_md_path(source, name) + if fp is None: + continue + rel = f"{name}/SKILL.md" + for h in _scan_text_file(fp): + out.append({ + "scope": "global_skill", + "agent_name": None, + "global_source": source, + "path": rel, + **h, + }) + return out + + +def _collect_all_content_signals(agent_names: list[str]) -> list[dict[str, Any]]: + """Sync: scan agents + global skills; cap total rows.""" + all_hits: list[dict[str, Any]] = [] + for an in agent_names: + sigs, _ = _scan_agent_content_files(an) + all_hits.extend(sigs) + all_hits.extend(_scan_global_skill_content()) + return all_hits[:_MAX_TOTAL_SIGNALS] + + +def _external_sources() -> dict[str, str]: + """Reference links used in security center.""" + return { + "clawhub": "https://clawhub.ai/", + "openclaw_configuration": "https://docs.openclaw.ai/gateway/configuration-reference", + "virustotal": "https://www.virustotal.com/", + "osv": "https://osv.dev/", + } + + +def _skill_clawhub_url(skill_name: str) -> str: + # Keep a stable fallback only; search query URL format may change. + return "https://clawhub.ai/" + + +def _extract_links_from_text(text: str) -> list[str]: + urls = re.findall(r"https?://[^\s)>\]\"]+", text) + # common git shorthand + github_short = re.findall(r"\b([A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+)\b", text) + for item in github_short: + # avoid false positives from paths with dots/slashes by requiring one slash and no spaces + if item.count("/") == 1 and not item.startswith(("http://", "https://")): + urls.append(f"https://github.com/{item}") + out: list[str] = [] + seen: set[str] = set() + for u in urls: + nu = u.strip().rstrip(".,);") + if not nu or nu in seen: + continue + seen.add(nu) + out.append(nu) + return out[:8] + + +def _extract_skill_source_links(scope: str, agent_name: str | None, global_source: str | None, skill_name: str) -> dict[str, Any]: + """Extract canonical links from SKILL.md content; fallback to ClawHub home.""" + skill_md: Path | None = None + if scope == "agent" and agent_name: + skill_md = Path(AGENTS_DIR) / resolve_agent_dir(agent_name) / "skills" / skill_name / "SKILL.md" + elif scope == "global_skill" and global_source: + skill_md = _global_skill_skill_md_path(global_source, skill_name) + + links: list[str] = [] + if skill_md and skill_md.is_file(): + try: + text = skill_md.read_text(encoding="utf-8", errors="replace") + links = _extract_links_from_text(text) + except OSError: + links = [] + + # Prefer GitHub/npm/homepage-like URLs if present. + preferred = None + for u in links: + ul = u.lower() + if "github.com" in ul or "npmjs.com" in ul or "gitlab.com" in ul: + preferred = u + break + if preferred is None and links: + preferred = links[0] + + if preferred is None: + preferred = _skill_clawhub_url(skill_name) + + return { + "primary_link": preferred, + "source_links": links, + } + + +def _build_skill_supply_audit( + agents_out: list[dict[str, Any]], + global_sources: list[dict[str, Any]], + content_signals: list[dict[str, Any]], + virustotal: dict[str, Any], +) -> dict[str, Any]: + """Aggregate audit rows by installed skill (agent/global/shared).""" + signal_index: dict[tuple[str, str | None, str | None, str], dict[str, int]] = {} + for s in content_signals: + scope = s.get("scope") + path = str(s.get("path") or "") + skill_name = None + if scope == "agent" and path.startswith("skills/") and path.endswith("/SKILL.md"): + parts = path.split("/") + if len(parts) >= 3: + skill_name = parts[1] + elif scope == "global_skill" and path.endswith("/SKILL.md"): + skill_name = path.split("/", 1)[0] + if not skill_name: + continue + key = (scope, s.get("agent_name"), s.get("global_source"), skill_name) + slot = signal_index.setdefault(key, {"warning": 0, "info": 0, "total": 0}) + sev = "warning" if s.get("severity") == "warning" else "info" + slot[sev] += 1 + slot["total"] += 1 + + vt_index: dict[tuple[str, str | None, str | None, str], dict[str, Any]] = {} + for h in (virustotal.get("hits") or []): + scope = h.get("scope") + path = str(h.get("path") or "") + skill_name = None + if scope == "agent" and path.startswith("skills/") and path.endswith("/SKILL.md"): + parts = path.split("/") + if len(parts) >= 3: + skill_name = parts[1] + elif scope == "global_skill" and path.endswith("/SKILL.md"): + skill_name = path.split("/", 1)[0] + if not skill_name: + continue + key = (scope, h.get("agent_name"), h.get("global_source"), skill_name) + vt_index[key] = { + "malicious": int(h.get("malicious") or 0), + "suspicious": int(h.get("suspicious") or 0), + "link": h.get("link"), + "sha256": h.get("sha256"), + } + + local_rows: list[dict[str, Any]] = [] + for a in agents_out: + for sk in a.get("local_skills") or []: + key = ("agent", a.get("name"), None, sk["name"]) + sig = signal_index.get(key, {"warning": 0, "info": 0, "total": 0}) + vt = vt_index.get(key) + src_links = _extract_skill_source_links("agent", a.get("name"), None, sk["name"]) + local_rows.append({ + "scope_level": "agent_local", + "agent_name": a.get("name"), + "global_source": None, + "skill_name": sk["name"], + "display_name": sk.get("display_name", sk["name"]), + "file_count": sk.get("file_count", 0), + "signal_warning": sig["warning"], + "signal_info": sig["info"], + "signal_total": sig["total"], + "vt": vt, + "primary_link": src_links["primary_link"], + "source_links": src_links["source_links"], + }) + + global_rows: list[dict[str, Any]] = [] + for src in global_sources: + source = src.get("source") + for sk in src.get("skills") or []: + key = ("global_skill", None, source, sk["name"]) + sig = signal_index.get(key, {"warning": 0, "info": 0, "total": 0}) + vt = vt_index.get(key) + src_links = _extract_skill_source_links("global_skill", None, source, sk["name"]) + global_rows.append({ + "scope_level": "global_or_shared", + "agent_name": None, + "global_source": source, + "skill_name": sk["name"], + "display_name": sk.get("display_name", sk["name"]), + "file_count": sk.get("file_count", 0), + "signal_warning": sig["warning"], + "signal_info": sig["info"], + "signal_total": sig["total"], + "vt": vt, + "primary_link": src_links["primary_link"], + "source_links": src_links["source_links"], + }) + + return { + "local_agent_skills": local_rows, + "global_or_shared_skills": global_rows, + } + + +def _sha256_of_file(path: Path) -> str | None: + try: + h = hashlib.sha256() + with path.open("rb") as f: + while True: + chunk = f.read(1024 * 256) + if not chunk: + break + h.update(chunk) + return h.hexdigest() + except OSError: + return None + + +def _collect_virustotal_targets(agent_names: list[str]) -> list[dict[str, Any]]: + """Collect key files for VT hash lookups.""" + targets: list[dict[str, Any]] = [] + seen: set[str] = set() + for an in agent_names: + base = Path(AGENTS_DIR) / resolve_agent_dir(an) + for rel in ("TOOLS.md", "AGENTS.md"): + fp = base / rel + if not fp.is_file(): + continue + key = str(fp.resolve()) + if key in seen: + continue + seen.add(key) + targets.append({ + "scope": "agent", + "agent_name": an, + "global_source": None, + "path": rel, + "abs_path": fp, + }) + skills_dir = base / "skills" + if skills_dir.is_dir(): + for sd in sorted(skills_dir.iterdir()): + sm = sd / "SKILL.md" + if not sm.is_file(): + continue + key = str(sm.resolve()) + if key in seen: + continue + seen.add(key) + targets.append({ + "scope": "agent", + "agent_name": an, + "global_source": None, + "path": f"skills/{sd.name}/SKILL.md", + "abs_path": sm, + }) + for src in gs.list_sources(): + source = src["source"] + for sk in gs.list_skills(source): + fp = _global_skill_skill_md_path(source, sk["name"]) + if fp is None: + continue + key = str(fp.resolve()) + if key in seen: + continue + seen.add(key) + targets.append({ + "scope": "global_skill", + "agent_name": None, + "global_source": source, + "path": f"{sk['name']}/SKILL.md", + "abs_path": fp, + }) + return targets[:_MAX_VT_FILES] + + +async def _fetch_virustotal(agent_names: list[str]) -> dict[str, Any]: + key = (os.environ.get("VIRUSTOTAL_API_KEY") or "").strip() + targets = _collect_virustotal_targets(agent_names) + if not key: + return { + "enabled": False, + "reason": "VIRUSTOTAL_API_KEY not configured", + "scanned_files": 0, + "hits": [], + } + if not targets: + return {"enabled": True, "scanned_files": 0, "hits": []} + + headers = {"x-apikey": key} + hits: list[dict[str, Any]] = [] + scanned = 0 + async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SECONDS) as client: + for t in targets: + sha256 = _sha256_of_file(t["abs_path"]) + if not sha256: + continue + scanned += 1 + url = f"https://www.virustotal.com/api/v3/files/{sha256}" + try: + resp = await client.get(url, headers=headers) + except Exception: + continue + if resp.status_code == 404: + continue + if resp.status_code != 200: + continue + try: + data = resp.json().get("data", {}) + stats = (data.get("attributes", {}) or {}).get("last_analysis_stats", {}) or {} + malicious = int(stats.get("malicious", 0) or 0) + suspicious = int(stats.get("suspicious", 0) or 0) + except Exception: + continue + if malicious <= 0 and suspicious <= 0: + continue + hits.append({ + "scope": t["scope"], + "agent_name": t["agent_name"], + "global_source": t["global_source"], + "path": t["path"], + "sha256": sha256, + "malicious": malicious, + "suspicious": suspicious, + "link": f"https://www.virustotal.com/gui/file/{sha256}", + }) + return { + "enabled": True, + "scanned_files": scanned, + "hits": hits, + } + + +def _frontend_direct_npm_dependencies() -> list[dict[str, str]]: + lock = Path(__file__).resolve().parents[3] / "frontend" / "package-lock.json" + if not lock.is_file(): + return [] + try: + data = json.loads(lock.read_text(encoding="utf-8", errors="replace")) + except Exception: + return [] + root_pkg = ((data.get("packages") or {}).get("") or {}) + root_deps = root_pkg.get("dependencies") or {} + out: list[dict[str, str]] = [] + for name in sorted(root_deps.keys()): + node = ((data.get("packages") or {}).get(f"node_modules/{name}") or {}) + version = (node.get("version") or "").strip() + if not version: + continue + out.append({"name": name, "version": version}) + if len(out) >= _MAX_OSV_DIRECT_DEPS: + break + return out + + +def _deps_from_package_json(path: Path) -> list[dict[str, str]]: + """Collect concrete npm deps from one package.json (only exact versions).""" + if not path.is_file(): + return [] + try: + data = json.loads(path.read_text(encoding="utf-8", errors="replace")) + except Exception: + return [] + out: list[dict[str, str]] = [] + for sec in ("dependencies", "optionalDependencies"): + block = data.get(sec) or {} + if not isinstance(block, dict): + continue + for name, version in block.items(): + v = str(version or "").strip() + # OSV version query prefers concrete versions (skip ranges/tags) + if not v or any(ch in v for ch in "^~*><=| "): + continue + out.append({"name": str(name), "version": v}) + return out + + +def _skill_direct_npm_dependencies() -> dict[str, list[dict[str, str]]]: + """Collect npm deps from global/shared skill package.json files.""" + out: dict[str, list[dict[str, str]]] = {} + for source_key, base in (("global_skills", Path(GLOBAL_SKILLS_DIR)), ("shared_skills", Path(SHARED_SKILLS_DIR))): + rows: list[dict[str, str]] = [] + if not base.is_dir(): + out[source_key] = rows + continue + for sk in sorted(base.iterdir()): + if not sk.is_dir(): + continue + pkg = sk / "package.json" + for dep in _deps_from_package_json(pkg): + rows.append({ + "name": dep["name"], + "version": dep["version"], + "component": sk.name, + "scope": source_key, + }) + if len(rows) >= _MAX_OSV_DIRECT_DEPS: + break + if len(rows) >= _MAX_OSV_DIRECT_DEPS: + break + out[source_key] = rows + return out + + +def _openclaw_runtime_npm_dependencies() -> list[dict[str, str]]: + """Best-effort: collect deps near openclaw.json (if a package.json exists there).""" + cfg = Path(OPENCLAW_CONFIG_PATH) + candidates = [ + cfg.parent / "package.json", + cfg.parent.parent / "package.json" if cfg.parent.parent else None, + ] + rows: list[dict[str, str]] = [] + seen: set[str] = set() + for c in candidates: + if not c or not c.is_file(): + continue + for dep in _deps_from_package_json(c): + k = f"{dep['name']}@{dep['version']}" + if k in seen: + continue + seen.add(k) + rows.append({ + "name": dep["name"], + "version": dep["version"], + "component": str(c.parent), + "scope": "openclaw_runtime", + }) + if len(rows) >= _MAX_OSV_DIRECT_DEPS: + return rows + return rows + + +def _collect_npm_dependency_sets() -> dict[str, list[dict[str, str]]]: + return { + "frontend": _frontend_direct_npm_dependencies(), + "openclaw_runtime": _openclaw_runtime_npm_dependencies(), + **_skill_direct_npm_dependencies(), + } + + +async def _fetch_osv_npm_vulns() -> dict[str, Any]: + dep_sets = _collect_npm_dependency_sets() + deps = [] + for scope, rows in dep_sets.items(): + for d in rows: + deps.append({ + "name": d["name"], + "version": d["version"], + "scope": scope, + "component": d.get("component"), + }) + deps = deps[:_MAX_OSV_QUERY_PACKAGES] + if not deps: + return {"checked_packages": 0, "vulnerabilities": [], "checked_by_scope": {}} + queries = [ + {"package": {"name": d["name"], "ecosystem": "npm"}, "version": d["version"]} + for d in deps + ] + payload = {"queries": queries} + vulns: list[dict[str, Any]] = [] + try: + async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SECONDS) as client: + resp = await client.post("https://api.osv.dev/v1/querybatch", json=payload) + if resp.status_code != 200: + return {"checked_packages": len(deps), "vulnerabilities": [], "checked_by_scope": {}} + results = (resp.json() or {}).get("results") or [] + except Exception: + return {"checked_packages": len(deps), "vulnerabilities": [], "checked_by_scope": {}} + + for idx, item in enumerate(results): + ds = deps[idx] + for v in (item.get("vulns") or []): + aliases = v.get("aliases") or [] + fixed = None + for aff in (v.get("affected") or []): + for r in (aff.get("ranges") or []): + for ev in (r.get("events") or []): + if ev.get("fixed"): + fixed = ev.get("fixed") + break + if fixed: + break + if fixed: + break + vulns.append({ + "package": ds["name"], + "version": ds["version"], + "scope": ds.get("scope"), + "component": ds.get("component"), + "id": v.get("id"), + "aliases": aliases[:6], + "summary": (v.get("summary") or "")[:220] or None, + "details": (v.get("details") or "")[:320] or None, + "fixed_version": fixed, + "link": f"https://osv.dev/vulnerability/{v.get('id')}" if v.get("id") else "https://osv.dev/", + }) + checked_by_scope: dict[str, int] = {} + for d in deps: + sc = d.get("scope") or "unknown" + checked_by_scope[sc] = checked_by_scope.get(sc, 0) + 1 + return {"checked_packages": len(deps), "vulnerabilities": vulns, "checked_by_scope": checked_by_scope} + + +def _llm_flags() -> dict[str, Any]: + from .config import read_config + + cfg = read_config() + llm = cfg.get("llm", {}) + default = llm.get("default", {}) + overrides = llm.get("overrides") or {} + purposes = [] + for purpose, oc in overrides.items(): + if isinstance(oc, dict) and (oc.get("api_key") or "").strip(): + purposes.append(purpose) + return { + "default_has_api_key": bool((default.get("api_key") or "").strip()), + "override_purposes_with_api_key": sorted(purposes), + "data_dir": str(DATA_DIR), + } + + +def _build_findings( + cors_all: bool, + gateway_token_set: bool, + secret_var_count: int, +) -> list[dict[str, str]]: + findings: list[dict[str, str]] = [] + if cors_all: + findings.append({ + "severity": "warning", + "code": "CORS_ALLOW_ALL", + "message": "ALLOWED_ORIGINS is * — any origin can call this API from a browser.", + }) + if not gateway_token_set: + findings.append({ + "severity": "info", + "code": "GATEWAY_TOKEN_UNSET", + "message": "GATEWAY_TOKEN is empty — outbound gateway calls may fail or be rejected.", + }) + if secret_var_count > 0: + findings.append({ + "severity": "info", + "code": "SECRET_VARIABLES_PRESENT", + "message": f"{secret_var_count} variable(s) marked as type secret (values never shown in this report).", + }) + return findings + + +async def build_audit_report() -> dict[str, Any]: + """Aggregate read-only audit data for the Security Center.""" + generated_at = datetime.now(timezone.utc).isoformat() + + cors_all, origin_count = _cors_allow_all() + gateway_token_set = bool((GATEWAY_TOKEN or "").strip()) + + openclaw_path = Path(OPENCLAW_CONFIG_PATH) + openclaw = _openclaw_preview(openclaw_path) + + agents_raw = await scanner.list_agents_async() + agents_out: list[dict[str, Any]] = [] + for a in agents_raw: + name = a["name"] + agent_id = await version_db.get_or_create_agent(name) + derivation = await version_db.get_derivation_by_agent_id(agent_id) + bp_name = None + if derivation: + bp = await version_db.get_blueprint(derivation["blueprint_id"]) + bp_name = bp["name"] if bp else None + + skill_rows = await _agent_skill_rows(name) + total_files = sum(s["file_count"] for s in skill_rows) + + agents_out.append({ + "name": name, + "display_name": a.get("display_name"), + "id": agent_id, + "blueprint_name": bp_name, + "host_path": a.get("host_path"), + "policy_files": _policy_files(name), + "local_skills": skill_rows, + "local_skill_count": len(skill_rows), + "local_skill_file_total": total_files, + }) + + variables_raw = await version_db.list_variables() + masked = [variable_service.mask_variable(dict(v)) for v in variables_raw] + by_type: dict[str, int] = {} + by_scope: dict[str, int] = {} + for v in variables_raw: + t = v.get("type") or "text" + by_type[t] = by_type.get(t, 0) + 1 + sc = v.get("scope") or "global" + by_scope[sc] = by_scope.get(sc, 0) + 1 + secret_count = sum(1 for v in variables_raw if v.get("type") == "secret") + + var_entries = [] + for v in masked: + var_entries.append({ + "id": v["id"], + "name": v["name"], + "type": v.get("type"), + "scope": v.get("scope"), + "agent_id": v.get("agent_id"), + "description": (v.get("description") or "")[:200] or None, + }) + + global_sources = await _global_source_rows() + + agent_name_list = [a["name"] for a in agents_out] + content_signals = await asyncio.to_thread(_collect_all_content_signals, agent_name_list) + session_risks = await asyncio.to_thread(_scan_session_risks, agent_name_list) + virustotal = await _fetch_virustotal(agent_name_list) + osv_npm = await _fetch_osv_npm_vulns() + skill_supply_audit = _build_skill_supply_audit( + agents_out=agents_out, + global_sources=global_sources, + content_signals=content_signals, + virustotal=virustotal, + ) + + agents_missing_tools = sum( + 1 for a in agents_out if not a["policy_files"].get("TOOLS.md", False) + ) + content_warning_hits = sum(1 for s in content_signals if s.get("severity") == "warning") + + findings = _build_findings(cors_all, gateway_token_set, secret_count) + + if ( + openclaw.get("parsed") + and openclaw.get("agents_registry_declared") + and openclaw.get("agents_in_registry_count", 0) != len(agents_out) + ): + findings.append({ + "severity": "info", + "code": "OPENCLAW_REGISTRY_COUNT_MISMATCH", + "message": ( + f"openclaw.json agents.list has {openclaw['agents_in_registry_count']} entries, " + f"but this dashboard scanned {len(agents_out)} workspace(s). " + "This may be normal if workspaces are not all registered." + ), + }) + if agents_missing_tools > 0: + findings.append({ + "severity": "warning", + "code": "MISSING_TOOLS_MD", + "message": f"{agents_missing_tools} agent workspace(s) have no TOOLS.md at the workspace root.", + }) + if content_warning_hits > 0: + findings.append({ + "severity": "info", + "code": "CONTENT_HEURISTIC_HITS", + "message": ( + f"{content_warning_hits} heuristic match(es) with severity 'warning' in scanned markdown " + "(see Content scan). Review context; not all matches are vulnerabilities." + ), + }) + if (session_risks.get("warning_hits") or 0) > 0: + findings.append({ + "severity": "warning", + "code": "SESSION_SENSITIVE_ACTIONS", + "message": ( + f"Detected {session_risks['warning_hits']} high-risk session action(s) " + "(privilege escalation, secrets exposure, plugin/skill installs)." + ), + }) + if (session_risks.get("info_hits") or 0) > 0: + findings.append({ + "severity": "info", + "code": "SESSION_SECURITY_SIGNALS", + "message": f"Detected {session_risks['info_hits']} additional session security signal(s).", + }) + if (virustotal.get("hits") or []): + findings.append({ + "severity": "warning", + "code": "VIRUSTOTAL_HITS", + "message": f"VirusTotal reported suspicious/malicious results for {len(virustotal['hits'])} scanned file(s).", + }) + if (osv_npm.get("vulnerabilities") or []): + by_scope = osv_npm.get("checked_by_scope") or {} + scope_summary = ", ".join(f"{k}={v}" for k, v in sorted(by_scope.items())) or "n/a" + findings.append({ + "severity": "warning", + "code": "NPM_KNOWN_VULNERABILITIES", + "message": ( + f"OSV found {len(osv_npm['vulnerabilities'])} known vulnerability record(s) " + f"across npm dependency scopes ({scope_summary})." + ), + }) + if any(v.get("scope") in {"global_skills", "shared_skills"} for v in (osv_npm.get("vulnerabilities") or [])): + findings.append({ + "severity": "warning", + "code": "SKILL_NPM_VULNERABILITIES", + "message": "Known npm vulnerabilities were detected in global/shared skill dependencies.", + }) + if any(v.get("scope") == "openclaw_runtime" for v in (osv_npm.get("vulnerabilities") or [])): + findings.append({ + "severity": "warning", + "code": "OPENCLAW_RUNTIME_NPM_VULNERABILITIES", + "message": "Known npm vulnerabilities were detected near OpenClaw runtime dependencies.", + }) + malware_like = 0 + for v in (osv_npm.get("vulnerabilities") or []): + text = f"{v.get('summary') or ''} {v.get('details') or ''}".lower() + if any(k in text for k in ("malware", "backdoor", "trojan", "credential theft", "exfiltrat", "typosquat")): + malware_like += 1 + if malware_like > 0: + findings.append({ + "severity": "warning", + "code": "NPM_POSSIBLE_MALICIOUS_PACKAGES", + "message": f"{malware_like} npm vulnerability record(s) include malware-like indicators; review package provenance immediately.", + }) + + for rec in openclaw.get("security_recommendations") or []: + if rec.get("severity") == "warning": + rid = rec.get("id", "UNKNOWN") + code = rid if str(rid).startswith("OPENCLAW_") else f"OPENCLAW_{rid}" + findings.append({ + "severity": "warning", + "code": code, + "message": f"openclaw.json: {rid} — see OpenClaw security recommendations below.", + }) + + findings.sort(key=lambda f: (0 if f.get("severity") == "warning" else 1, f.get("code", ""))) + + fw = sum(1 for f in findings if f.get("severity") == "warning") + fi = sum(1 for f in findings if f.get("severity") == "info") + cw = sum(1 for s in content_signals if s.get("severity") == "warning") + ci = sum(1 for s in content_signals if s.get("severity") == "info") + + return { + "generated_at": generated_at, + "summary": { + "agents_scanned": len(agents_out), + "global_skill_sources": len(global_sources), + "findings_warning": fw, + "findings_info": fi, + "content_hits_total": len(content_signals), + "content_hits_warning": cw, + "content_hits_info": ci, + "agents_missing_tools_md": agents_missing_tools, + }, + "dashboard": { + "cors_allow_all": cors_all, + "allowed_origins_count": origin_count, + "gateway_url": GATEWAY_URL, + "gateway_token_configured": gateway_token_set, + "agents_dir": AGENTS_DIR, + "global_skills_dir": GLOBAL_SKILLS_DIR, + "shared_skills_dir": SHARED_SKILLS_DIR, + "openclaw_config": openclaw, + }, + "llm_settings": _llm_flags(), + "external_intel": { + "sources": _external_sources(), + "clawhub": { + "source_url": "https://clawhub.ai/", + "notes": "Use ClawHub search to verify skill provenance/version context manually.", + }, + "virustotal": virustotal, + "osv_npm": osv_npm, + }, + "agents": agents_out, + "global_skill_sources": global_sources, + "skill_supply_audit": skill_supply_audit, + "content_signals": content_signals, + "session_risks": session_risks, + "variables": { + "total": len(variables_raw), + "by_type": by_type, + "by_scope": by_scope, + "entries": var_entries, + }, + "findings": findings, + } diff --git a/frontend/src/api/index.js b/frontend/src/api/index.js index e6c2fc3..558ff26 100644 --- a/frontend/src/api/index.js +++ b/frontend/src/api/index.js @@ -79,6 +79,9 @@ export function sendSessionMessage(agent, sessionKey, message, mode, envelopeCon }).then(r => r.data) } +// Security (read-only audit) +export const fetchSecurityAudit = () => api.get('/security/audit').then(r => r.data) + // Settings export const fetchSettings = () => api.get('/settings').then(r => r.data) export const updateSettings = (data) => api.put('/settings', data).then(r => r.data) diff --git a/frontend/src/components/AppLayout.vue b/frontend/src/components/AppLayout.vue index 0c25979..c9b98cd 100644 --- a/frontend/src/components/AppLayout.vue +++ b/frontend/src/components/AppLayout.vue @@ -30,6 +30,14 @@ {{ t('management.title') }} + + + {{ t('app.security') }} + @@ -66,6 +74,10 @@ {{ t('management.title') }} + + + {{ t('app.security') }} +
@@ -86,7 +98,7 @@ import { computed, ref, onMounted, watch } from 'vue' import { useRoute } from 'vue-router' import { useI18n } from 'vue-i18n' -import { Setting, DataLine, Monitor, Menu as MenuIcon, Close } from '@element-plus/icons-vue' +import { Setting, DataLine, Monitor, Menu as MenuIcon, Close, Lock } from '@element-plus/icons-vue' import { useSettingsStore } from '../stores/settings' import { useResponsive } from '../composables/useResponsive' @@ -98,6 +110,7 @@ const { isMobile } = useResponsive() const isDashboard = computed(() => route.path === '/dashboard') const isAgents = computed(() => route.path.startsWith('/agents')) const isManagement = computed(() => route.path.startsWith('/management')) +const isSecurity = computed(() => route.path.startsWith('/security')) const mobileMenuOpen = ref(false) @@ -184,7 +197,9 @@ function toggleLocale() { } .app-body { flex: 1; - overflow: hidden; + min-height: 0; + overflow-x: hidden; + overflow-y: auto; } /* Mobile header */ .mobile-menu-btn { diff --git a/frontend/src/components/BlueprintsPanel.vue b/frontend/src/components/BlueprintsPanel.vue index 7bf8023..c923c19 100644 --- a/frontend/src/components/BlueprintsPanel.vue +++ b/frontend/src/components/BlueprintsPanel.vue @@ -401,14 +401,13 @@ + +