diff --git a/.agents/sessions/2026-03-28-plugin-runtime-migration/handoff.md b/.agents/sessions/2026-03-28-plugin-runtime-migration/handoff.md new file mode 100644 index 00000000..c9f12bdd --- /dev/null +++ b/.agents/sessions/2026-03-28-plugin-runtime-migration/handoff.md @@ -0,0 +1,276 @@ +# Handoff + +## Phase 0: Planning + +**Status:** complete + +**Tasks completed:** + +- Planned the subprocess plugin runtime migration with tools and channels as + the first scope. +- Converted discovery questions into explicit implementation assumptions. +- Saved the approved session plan, task checklist, and handoff log. + +**Files changed:** + +- `.agents/sessions/2026-03-28-plugin-runtime-migration/plan.md` — approved + implementation plan +- `.agents/sessions/2026-03-28-plugin-runtime-migration/tasks.md` — phase + checklist +- `.agents/sessions/2026-03-28-plugin-runtime-migration/handoff.md` — session + handoff log initialized + +**Commits:** + +- None yet + +**Decisions & context for next phase:** + +- Tools and network channels are the first migration target. +- Providers and memory are explicitly deferred. +- `cli` remains builtin in this milestone. +- JS plugins remain supported during the migration. + +## Phase 1: Runtime Foundation + +**Status:** complete + +**Tasks completed:** + +- 1.1: Added a new `pluginapi` package with a versioned manifest schema, + protocol envelopes, handshake payloads, and capability constants. +- 1.2: Added a new `pluginhost` package with manifest loading, catalog + discovery, line-framed stdio protocol helpers, a plugin client, and a + restart-capable supervisor. +- 1.3: Added Anna home plugin path helpers and a builtin entrypoint token + (`@anna`) so first-party plugins can run as subprocesses of the current + binary in later phases. +- 1.4: Added focused tests for manifest validation, duplicate discovery, + handshake/health, and supervisor restart behavior. + +**Files changed:** + +- `internal/pluginapi/types.go` — protocol and manifest types +- `internal/pluginhost/manifest.go` — manifest loading and validation +- `internal/pluginhost/catalog.go` — filesystem discovery and catalog lookup +- `internal/pluginhost/protocol.go` — line-framed envelope IO helpers +- `internal/pluginhost/client.go` — subprocess lifecycle and request/response + client +- `internal/pluginhost/supervisor.go` — restart-capable host supervisor +- `internal/pluginhost/catalog_test.go` — catalog discovery tests +- `internal/pluginhost/manifest_test.go` — manifest validation tests +- `internal/pluginhost/client_test.go` — handshake and restart tests +- `internal/config/plugins.go` — plugin path helpers under `ANNA_HOME` + +**Commits:** + +- `99c7d80` — `✨ feat: scaffold subprocess plugin runtime` + +**Decisions & context for next phase:** + +- The runtime is intentionally generic and not yet wired into runner or channel + startup paths. +- First-party plugin packaging should avoid a second deployment story; the + `@anna` builtin entrypoint is the preferred bridge for Phase 2. +- Tool migration should start by defining the tool RPC contract and adapter + before converting the concrete tools. + +### Fixes + +- Removed the hidden `anna plugin runtime ...` CLI entrypoint and moved the + builtin tool runtime behind an internal env-driven boot path in `cmd/anna`, + so the subprocess bridge is no longer exposed as a normal command surface. +- Changed request cancellation in `internal/pluginhost/client.go` to terminate + the current plugin process instead of leaking a blocked response reader + goroutine. +- Added `Close()` forwarding to the sandbox wrapper so subprocess-backed file + tools actually shut down when the runner closes. +- Added `internal/agent/tool/plugin_entrypoint_test.go` so tool package tests + use a real built `anna` binary as the builtin plugin entrypoint instead of + recursively re-running the Go test binary. +- Tightened the internal runtime gate to verify a host-provided token from an + inherited file descriptor instead of trusting env vars alone. +- Bounded `Client.Close()` shutdown requests with a timeout so an unresponsive + plugin cannot block runner teardown indefinitely. +- Removed the tool runtime from the main `anna` binary entirely and moved the + subprocess bridge into the dedicated `cmd/anna-plugin` helper binary. +- Updated the test harnesses and command build task so plugin-backed tools now + use `anna-plugin` as the builtin entrypoint instead of the main binary. +- Updated GoReleaser packaging so release archives include both `anna` and + `anna-plugin`, keeping the helper binary present in shipped artifacts. +- Fixed the helper fallback path to preserve the executable suffix from the + current binary, so Windows installs resolve `anna-plugin.exe` correctly. + +## Phase 2: Tool Plugin Migration + +**Status:** complete + +**Tasks completed:** + +- Added a subprocess tool RPC bridge that serves built-in tools from the + current `anna` binary via the `@anna` builtin entrypoint. +- Routed the built-in tool registry through subprocess-backed tool adapters + while preserving the existing sandbox and working-directory behavior. +- Migrated `read`, `bash`, `edit`, `write`, and `webfetch` onto the bundled + subprocess plugin path. +- Added integration coverage that runs the real `anna` binary and verifies the + migrated tools still behave equivalently, including sandbox rejection. + +**Files changed:** + +- `internal/pluginapi/types.go` — added tool-specific manifest and RPC types +- `internal/pluginhost/server.go` — single-tool subprocess protocol server +- `internal/pluginhost/manifest.go` — builtin entrypoint and tool validation +- `internal/pluginhost/client.go` — builtin entrypoint launch bridge +- `internal/agent/tool/plugin_runtime.go` — bundled built-in tool manifest and + runtime helper +- `cmd/anna/plugin.go` — hidden `plugin runtime tool` entrypoint +- `internal/agent/plugin_entrypoint_test.go` — test helper for built binary +- `internal/agent/runner/plugin_entrypoint_test.go` — test helper for runner + package +- `internal/agent/tool/plugin_tool.go` — subprocess-backed tool adapter +- `internal/agent/tool/tool.go` — registry now wires built-in tools through the + plugin adapter +- `internal/agent/runner/gorunner.go` — closes plugin-backed tools on shutdown +- `cmd/anna/plugin_runtime_test.go` — end-to-end tool migration coverage +- `internal/pluginhost/catalog_test.go` — tool manifest validation updates +- `internal/pluginhost/client_test.go` — builtin tool handshake test updates + +**Commits:** + +- `47c34a9` — `✨ feat: add subprocess tool runtime bridge` +- `ded0fff` — `✨ feat: run built-in tools through plugins` +- `da97d2d` — `🧪 test: cover plugin-backed tool migration` + +**Decisions & context for next phase:** + +- The tool path is now fully going through the subprocess protocol for the + built-in tool set. +- `@anna` is the preferred bridge for first-party plugin packaging; there is no + second packaging format. +- JS plugins remain untouched and continue to work alongside the new runtime. +- Phase 3 can now focus on channel plugin contracts and channel loading. + +## Phase 3: Channel Plugin Migration + +**Status:** complete + +**Tasks completed:** + +- Added a channel subprocess protocol contract for start, stop, notify, and + inbound event semantics in `pluginapi` and `pluginhost`. +- Added a restartable host-side channel adapter that supervises plugin + processes and retries after crashes instead of taking down the daemon. +- Replaced the hard-coded Telegram/QQ/Feishu/Weixin constructors in the main + gateway with catalog-driven loading and bundled channel plugin definitions. +- Added a built-in `anna-plugin channel ` helper path that boots each + migrated channel as a subprocess plugin while preserving the existing DB + config and auth wiring. +- Added focused restart coverage for the channel plugin adapter. + +**Files changed:** + +- `internal/pluginapi/types.go` — channel plugin wire types and capabilities +- `internal/pluginhost/server.go` — channel protocol server loop +- `internal/pluginhost/channeladapter.go` — restartable host adapter +- `internal/pluginhost/catalog.go` — catalog merge support for bundled defs +- `internal/channel/config.go` — shared channel config JSON types +- `internal/channel/plugin_runtime.go` — bundled channel manifest definitions +- `cmd/anna-plugin/channel.go` — channel plugin bootstrapper +- `cmd/anna-plugin/main.go` — channel helper command registration +- `cmd/anna/channel_plugins.go` — catalog-driven channel loading helpers +- `cmd/anna/gateway.go` — removed hard-coded channel construction +- `internal/pluginhost/channeladapter_test.go` — channel restart coverage + +**Commits:** + +- `61e7c05` — `✨ feat: add channel plugin runtime contract` +- `cee64f2` — `✨ feat: load channels from plugin catalog` +- `6fd8499` — `🧪 test: cover channel plugin restart handling` + +**Decisions & context for next phase:** + +- Bundled first-party channels are now launched through the same subprocess + helper path as future third-party channel plugins. +- Channel config JSON remains unchanged and is still read from the DB. +- The host now treats a channel process crash as a restartable failure instead + of a daemon-level failure. +- Phase 4 can now focus on plugin binding config, JS compatibility, operator + visibility, and docs. + +### Fixes + +- Switched admin/channel link-code storage from per-process in-memory state to + a shared DB-backed store, preserving account-linking flows after the channel + subprocess migration. +- Updated both the host gateway and channel helper runtime to use the shared + link-code store so `/api/auth/profile/link-code` and `/link ` operate + across process boundaries. +- Added regression coverage that opens two DB handles against the same Anna DB + and verifies one store instance can generate a code that another instance + consumes. + +## Phase 4: Integration, Compatibility, and Docs + +**Status:** complete + +**Tasks completed:** + +- Added a separate `runtime_plugins` setting so subprocess tool and channel + bindings can change independently from the legacy JS plugin list. +- Routed the Go runner and channel gateway through runtime binding resolution, + which now makes built-in tool/channel slots replaceable by plugin ID. +- Added `anna plugin runtime list` and `anna plugin runtime bind` to expose + effective bindings, enabled channel state, and log location to operators. +- Preserved JS plugin compatibility by keeping `settings.plugins` untouched and + documenting the new boundary between JS and subprocess plugins. +- Added binding coverage for tool replacement and channel binding resolution, + then ran the full Go test suite and release config validation. + +**Files changed:** + +- `internal/config/runtime_plugins.go` — runtime binding setting types and + load/save helpers +- `internal/config/snapshot.go` — runtime binding snapshot field +- `internal/config/dbstore.go` — snapshot loading for `runtime_plugins` +- `internal/agent/tool/tool.go` — runtime tool catalog loading and binding + resolution +- `internal/agent/tool/plugin_runtime.go` — bundled tool definition helpers +- `internal/agent/factory.go` — runner config now carries runtime bindings +- `internal/agent/runner/gorunner.go` — tool registry now respects runtime + bindings +- `internal/channel/plugin_runtime.go` — bundled channel definition helpers +- `cmd/anna/channel_plugins.go` — channel binding resolution helper +- `cmd/anna/gateway.go` — gateway now resolves channel plugins via bindings +- `cmd/anna/plugin_runtime_cli.go` — runtime plugin binding and status CLI +- `cmd/anna/plugin.go` — JS plugin command wording kept distinct from runtime + plugin management +- `internal/agent/tool/runtime_bindings_test.go` — replacement-path coverage +- `cmd/anna/channel_plugins_test.go` — channel binding coverage +- `internal/config/dbstore_test.go` — JS/runtime setting compatibility coverage +- `README.md` — runtime plugin CLI and feature summary updates +- `docs/content/docs/features/plugin-system.md` — JS vs subprocess plugin docs +- `docs/content/docs/getting-started/configuration.md` — `runtime_plugins` + setting and plugin directory layout + +**Commits:** + +- `c393ba8` — `✨ feat: add runtime plugin bindings` +- `61a5c76` — `✨ feat: add runtime plugin runtime CLI` +- `1a73cce` — `🧪 test: cover runtime plugin bindings` +- `1f7013c` — `📝 docs: record runtime plugin bindings` +- `c2bfcd1` — `🧪 test: cover runtime plugin settings helpers` +- `4afcd29` — `📝 docs: clarify runtime plugin slot bindings` + +**Decisions & context for next phase:** + +- The migration slice is complete for tools and channels. +- JS plugins remain the extension path for hooks and lightweight custom tools. +- Runtime bindings are now the control point for swapping subprocess-backed + tool and channel implementations. +- Providers and memory remain deferred to later plugin phases. + +### Verification + +- `mise x -- go test ./...` +- `mise run release:check` diff --git a/.agents/sessions/2026-03-28-plugin-runtime-migration/plan.md b/.agents/sessions/2026-03-28-plugin-runtime-migration/plan.md new file mode 100644 index 00000000..376520bc --- /dev/null +++ b/.agents/sessions/2026-03-28-plugin-runtime-migration/plan.md @@ -0,0 +1,172 @@ +# Plan: Plugin Runtime Migration + +## Overview + +Build a subprocess plugin runtime and migrate existing first-party tools and +network channels onto it first, while keeping the current JS plugin path and +hard-coded builtins temporarily for compatibility. The goal is to make tools +and channels replaceable without changing Anna's external behavior. + +### Goals + +- Introduce a versioned subprocess plugin host with discovery, manifest + validation, supervision, and capability-based loading. +- Migrate built-in tools (`read`, `bash`, `edit`, `write`, `webfetch`) to + bundled first-party subprocess plugins. +- Migrate network channels (Telegram, QQ, Feishu, Weixin) to bundled + first-party subprocess plugins. +- Preserve DB-backed configuration and existing agent/session behavior during + the migration. + +### Success Criteria + +- [ ] Anna can discover bundled plugins from a plugin catalog and load them by + manifest. +- [ ] Tool execution can be routed through subprocess plugins without changing + the engine contract. +- [ ] Network channels can start, stop, and notify through subprocess plugins + instead of hard-coded constructors. +- [ ] Existing built-in tool behavior remains functionally equivalent from the + agent's perspective. +- [ ] Existing channel config stored in `settings_channels` remains usable + without manual migration. +- [ ] JS plugins remain operational during this migration slice. + +### Out of Scope + +- Providers and memory plugins. +- Replacing the CLI channel. +- Removing the current JS plugin system. +- Third-party install UX beyond the runtime/catalog support needed for bundled + plugins. + +## Technical Approach + +Introduce a subprocess plugin subsystem with four core parts: + +- `pluginapi`: manifest types, protocol envelopes, capability constants, and + version negotiation. +- `pluginhost`: discovery, validation, process supervision, stdio transport, + health checks, restart policy, and structured logging. +- adapters: host-side adapters that make subprocess plugins implement existing + Anna interfaces for tools and channels. +- bundled first-party plugin binaries: first-party executables built from repo + code and shipped as Anna-managed plugins. + +Key design decisions: + +- Use a single versioned stdio protocol for all plugin kinds. +- Keep canonical state in the host; plugins do not write directly to Anna DB + internals. +- Load tools lazily and channels eagerly. +- Preserve current DB config as the source of truth, layering plugin binding on + top during migration. +- Preserve current engine and session contracts so the migration stays mostly + at the loading and adapter layer. + +### Components + +- **Plugin API**: protocol messages, manifest schema, capability model, + handshake, and health endpoints. +- **Plugin Catalog**: bundled plugin discovery and binding resolution. +- **Tool Adapter**: exposes subprocess tool plugins as `tool.Tool`. +- **Channel Adapter**: exposes subprocess channel plugins as `channel.Channel`. +- **Bundled Plugin Binaries**: first-party executables for existing tools and + channels. +- **Compatibility Layer**: preserves current settings loading and channel + config shapes during the migration. + +## Implementation Phases + +### Phase 1: Runtime Foundation + +1. Define plugin manifest, protocol, capability model, and supervisor package + layout (files: `internal/pluginapi/*`, `internal/pluginhost/*`). +2. Implement subprocess lifecycle management, stdio framing, handshake, health, + shutdown, restart policy, and log handling (files: + `internal/pluginhost/*`). +3. Implement bundled plugin discovery and host-side registration APIs (files: + `internal/pluginhost/catalog.go`, `internal/pluginhost/manifest.go`, + `internal/config/*` as needed). +4. Add focused tests for protocol negotiation, invalid manifests, crashed + processes, timeouts, and restart behavior (files: + `internal/pluginhost/*_test.go`). + +### Phase 2: Tool Plugin Migration + +1. Introduce a subprocess tool adapter and plugin registry path that plugs into + runner setup (files: `internal/pluginhost/tooladapter.go`, + `internal/agent/tool/tool.go`, `internal/agent/runner/gorunner.go`, + `cmd/anna/commands.go`). +2. Migrate first-party tools `read`, `bash`, `edit`, `write`, and `webfetch` + into bundled subprocess plugins (files: `cmd/anna-plugin/*`, + `internal/agent/tool/*`, plugin manifests under a bundled plugin dir). +3. Preserve sandboxing and working-directory semantics now enforced in the host + tool registry. +4. Add integration tests proving equivalent tool definitions and execution + behavior. + +### Phase 3: Channel Plugin Migration + +1. Define the channel plugin contract for start, stop, inbound message + delivery, and notifications (files: `internal/pluginapi/*`, + `internal/pluginhost/channeladapter.go`). +2. Replace hard-coded channel construction with catalog-driven loading (files: + `cmd/anna/gateway.go`, `internal/pluginhost/*`). +3. Migrate Telegram, QQ, Feishu, and Weixin as bundled channel plugins while + preserving existing config schemas and auth wiring (files: + `internal/channel/*`, `cmd/anna-plugin/*`, bundled plugin manifests). +4. Add supervision, restart, and partial-failure handling so a crashed channel + plugin does not terminate the daemon. + +### Phase 4: Integration, Compatibility, and Docs + +1. Add plugin binding config for bundled tools and channels without breaking + current DB settings (files: `internal/config/*`, `cmd/anna/*`, + admin/config surfaces as needed). +2. Keep JS plugins working and document migration boundaries clearly (files: + `docs/*`, `README.md`). +3. Add operator-facing status and log surfaces for plugin processes (files: + `internal/admin/*`, `internal/pluginhost/*` as needed). +4. Document the bundled plugin model and the migration path for later provider + and memory work. + +## Testing Strategy + +- Unit tests for manifest parsing, protocol framing, handshake validation, and + supervisor lifecycle. +- Integration tests for subprocess tool execution under success, timeout, + stderr noise, invalid JSON, and crash scenarios. +- Integration tests for channel startup, inbound event bridging, + notifications, and restart-after-crash behavior. +- Regression tests ensuring current channel configs still deserialize and + function. +- End-to-end smoke tests covering daemon boot with bundled plugins only. + +## Risks + +| Risk | Impact | Mitigation | +| ---- | ------ | ---------- | +| Protocol scope grows too quickly | High | Keep v1 limited to tools and channels only; defer providers and memory | +| Tool sandbox behavior regresses | High | Preserve host-side sandbox enforcement and add regression coverage for each migrated tool | +| Channel plugin crash loops | High | Add restart backoff, health checks, and isolate failure per channel | +| Packaging bundled plugin executables becomes messy | Medium | Standardize manifest and output layout early and wire it into the build | +| Compatibility layer becomes permanent debt | Medium | Make the deprecation boundary explicit and follow up after tools/channels land | + +## Open Questions + +- None. Earlier scope questions were converted into explicit assumptions. + +## Review Feedback + +- Self-review: the plan addresses the approved scope, keeps phases ordered + around runtime first, and defers providers/memory to avoid overloading v1. + +## Final Status + +Implementation complete on branch `feat/plugin-runtime-migration`. + +- Phase 1: runtime foundation — complete +- Phase 2: tool subprocess migration — complete +- Phase 3: channel subprocess migration — complete +- Phase 4: bindings, operator visibility, compatibility, and docs — complete diff --git a/.agents/sessions/2026-03-28-plugin-runtime-migration/tasks.md b/.agents/sessions/2026-03-28-plugin-runtime-migration/tasks.md new file mode 100644 index 00000000..c2c0d88f --- /dev/null +++ b/.agents/sessions/2026-03-28-plugin-runtime-migration/tasks.md @@ -0,0 +1,36 @@ +# Tasks: Plugin Runtime Migration + +## Phase 1: Runtime Foundation + +- [x] Define manifest schema, protocol envelopes, and capability constants +- [x] Implement subprocess supervisor and stdio transport +- [x] Implement bundled plugin catalog/discovery +- [x] Add protocol and supervisor tests + +## Phase 2: Tool Plugin Migration + +- [x] Add subprocess tool adapter to the runner/tool registry path +- [x] Migrate `read` to a bundled plugin +- [x] Migrate `bash` to a bundled plugin +- [x] Migrate `edit` to a bundled plugin +- [x] Migrate `write` to a bundled plugin +- [x] Migrate `webfetch` to a bundled plugin +- [x] Add tool integration coverage + +## Phase 3: Channel Plugin Migration + +- [x] Define channel plugin contract and host adapter +- [x] Replace hard-coded channel loading with catalog-driven startup +- [x] Migrate Telegram channel plugin +- [x] Migrate QQ channel plugin +- [x] Migrate Feishu channel plugin +- [x] Migrate Weixin channel plugin +- [x] Add channel supervision and restart coverage + +## Phase 4: Integration, Compatibility, and Docs + +- [x] Add plugin binding config for bundled tools/channels +- [x] Preserve JS plugin compatibility +- [x] Add plugin status/log visibility +- [x] Document runtime, bundled plugin layout, and migration boundaries +- [x] Run full test suite diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 7a6e13e7..837acae6 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -22,9 +22,24 @@ builds: hooks: pre: - bash scripts/download-tools.sh --goos {{ .Os }} --goarch {{ .Arch }} + - id: anna-plugin + main: ./cmd/anna-plugin + binary: anna-plugin + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + - windows + goarch: + - amd64 + - arm64 archives: - id: anna + ids: + - anna + - anna-plugin changelog: sort: asc diff --git a/README.md b/README.md index 64772cf3..d36bf914 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,9 @@ Run multiple agents at once. A coding assistant, a writing partner, a daily plan Multiple users out of the box. Users are auto-created from platform identity (Telegram user ID, QQ ID, etc). Each user gets per-agent memory stored in the database, so Anna remembers different things about different people. -And the whole thing is a single Go binary with a SQLite database. Your machine, your API keys, nothing leaves your network. +And the whole thing is a Go CLI with a SQLite database. Your machine, your API keys, nothing leaves your network. + +Extensibility now has two layers: JavaScript plugins for lightweight hooks and tools, and subprocess runtime plugins for replaceable built-in tools and channels. ## How it works @@ -184,6 +186,9 @@ anna models set

# Switch model (e.g. openai/gpt-4o) anna models search # Search models anna skills search # Search skills.sh anna skills install # Install a skill +anna plugin list # List configured JavaScript plugins +anna plugin runtime list # Show runtime tool/channel bindings +anna plugin runtime bind tool read tool/read anna version # Print version anna upgrade # Self-update to latest release ``` @@ -203,7 +208,7 @@ anna upgrade # Self-update to latest release | [Feishu Bot](docs/content/docs/channels/feishu.md) | Bot setup, WebSocket, streaming | | [WeChat Bot](docs/content/docs/channels/weixin.md) | iLink Bot setup, QR login, DM | | [Scheduler System](docs/content/docs/features/scheduler-system.md) | Scheduler system, heartbeat, persistence | -| [Plugin System](docs/content/docs/features/plugin-system.md) | JavaScript plugins, tools, hooks | +| [Plugin System](docs/content/docs/features/plugin-system.md) | JavaScript hooks plus subprocess tool/channel plugins | | [Notification System](docs/content/docs/features/notification-system.md) | Dispatcher, backends, routing | ## Development diff --git a/cmd/anna-plugin/channel.go b/cmd/anna-plugin/channel.go new file mode 100644 index 00000000..04eaaa85 --- /dev/null +++ b/cmd/anna-plugin/channel.go @@ -0,0 +1,288 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "time" + + "github.com/vaayne/anna/internal/agent" + agenttool "github.com/vaayne/anna/internal/agent/tool" + "github.com/vaayne/anna/internal/auth" + channelplugin "github.com/vaayne/anna/internal/channel" + "github.com/vaayne/anna/internal/channel/feishu" + "github.com/vaayne/anna/internal/channel/qq" + "github.com/vaayne/anna/internal/channel/telegram" + "github.com/vaayne/anna/internal/channel/weixin" + "github.com/vaayne/anna/internal/config" + appdb "github.com/vaayne/anna/internal/db" + "github.com/vaayne/anna/internal/memory" + memorytool "github.com/vaayne/anna/internal/memory/tool" + pluginmgr "github.com/vaayne/anna/internal/plugin" + "github.com/vaayne/anna/internal/pluginapi" + "github.com/vaayne/anna/internal/pluginhost" +) + +type channelPluginRuntime struct { + bot channelplugin.Channel +} + +func (r channelPluginRuntime) Start(ctx context.Context) error { + return r.bot.Start(ctx) +} + +func (r channelPluginRuntime) Notify(ctx context.Context, n pluginapi.ChannelNotification) error { + return r.bot.Notify(ctx, channelplugin.Notification(n)) +} + +type channelRuntimeDeps struct { + db *sql.DB + store config.Store + authStore auth.AuthStore + engine *auth.PolicyEngine + linkCodes *auth.LinkCodeStore + poolManager *agent.PoolManager + pool *agent.Pool + snap *config.Snapshot + listFn channelplugin.ModelListFunc + switchFn channelplugin.ModelSwitchFunc +} + +func buildChannelPlugin(name, workDir, userDataDir string) (pluginhost.Definition, channelPluginRuntime, error) { + deps, err := newChannelRuntime(context.Background(), workDir, userDataDir) + if err != nil { + return pluginhost.Definition{}, channelPluginRuntime{}, err + } + + def, err := channelplugin.BuiltinChannelDefinition(name, workDir, userDataDir) + if err != nil { + return pluginhost.Definition{}, channelPluginRuntime{}, err + } + + switch name { + case "telegram": + cfg := channelplugin.LoadConfig[channelplugin.TelegramConfig](deps.store, name) + if cfg == nil || cfg.Token == "" { + return pluginhost.Definition{}, channelPluginRuntime{}, fmt.Errorf("telegram: missing channel config") + } + bot, err := telegram.New(telegram.Config{ + Token: cfg.Token, + ChannelID: cfg.ChannelID, + GroupMode: cfg.GroupMode, + }, deps.poolManager, deps.store, deps.listFn, deps.switchFn, + telegram.WithAuth(deps.authStore, deps.engine, deps.linkCodes), + ) + if err != nil { + return pluginhost.Definition{}, channelPluginRuntime{}, err + } + return def, channelPluginRuntime{bot: bot}, nil + + case "qq": + cfg := channelplugin.LoadConfig[channelplugin.QQConfig](deps.store, name) + if cfg == nil || cfg.AppID == "" || cfg.AppSecret == "" { + return pluginhost.Definition{}, channelPluginRuntime{}, fmt.Errorf("qq: missing channel config") + } + bot, err := qq.New(qq.Config{ + AppID: cfg.AppID, + AppSecret: cfg.AppSecret, + GroupMode: cfg.GroupMode, + }, deps.poolManager, deps.store, deps.listFn, deps.switchFn, + qq.WithAuth(deps.authStore, deps.engine, deps.linkCodes), + ) + if err != nil { + return pluginhost.Definition{}, channelPluginRuntime{}, err + } + return def, channelPluginRuntime{bot: bot}, nil + + case "feishu": + cfg := channelplugin.LoadConfig[channelplugin.FeishuConfig](deps.store, name) + if cfg == nil || cfg.AppID == "" || cfg.AppSecret == "" { + return pluginhost.Definition{}, channelPluginRuntime{}, fmt.Errorf("feishu: missing channel config") + } + groups := make(map[string]feishu.GroupConfig, len(cfg.Groups)) + for k, v := range cfg.Groups { + groups[k] = feishu.GroupConfig{ + GroupMode: v.GroupMode, + SystemPrompt: v.SystemPrompt, + ToolAllow: v.ToolAllow, + ToolDeny: v.ToolDeny, + } + } + bot, err := feishu.New(feishu.Config{ + AppID: cfg.AppID, + AppSecret: cfg.AppSecret, + EncryptKey: cfg.EncryptKey, + VerificationToken: cfg.VerificationToken, + GroupMode: cfg.GroupMode, + Groups: groups, + }, deps.poolManager, deps.store, deps.listFn, deps.switchFn, + feishu.WithAuth(deps.authStore, deps.engine, deps.linkCodes), + ) + if err != nil { + return pluginhost.Definition{}, channelPluginRuntime{}, err + } + return def, channelPluginRuntime{bot: bot}, nil + + case "weixin": + cfg := channelplugin.LoadConfig[channelplugin.WeixinConfig](deps.store, name) + if cfg == nil || cfg.BotToken == "" { + return pluginhost.Definition{}, channelPluginRuntime{}, fmt.Errorf("weixin: missing channel config") + } + bot, err := weixin.New(weixin.Config{ + BotToken: cfg.BotToken, + BaseURL: cfg.BaseURL, + BotID: cfg.BotID, + UserID: cfg.UserID, + }, deps.poolManager, deps.store, deps.listFn, deps.switchFn, + weixin.WithAuth(deps.authStore, deps.engine, deps.linkCodes), + ) + if err != nil { + return pluginhost.Definition{}, channelPluginRuntime{}, err + } + return def, channelPluginRuntime{bot: bot}, nil + } + + return pluginhost.Definition{}, channelPluginRuntime{}, fmt.Errorf("unknown channel plugin: %s", name) +} + +func newChannelRuntime(ctx context.Context, workDir, userDataDir string) (*channelRuntimeDeps, error) { + dbPath := config.DBPath() + db, err := appdb.OpenDB(dbPath) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + + store := config.NewDBStore(db) + if err := store.SeedDefaults(ctx); err != nil { + return nil, fmt.Errorf("seed defaults: %w", err) + } + + authStore := appdb.NewAuthStore(db) + if err := auth.SeedPolicies(ctx, authStore); err != nil { + return nil, fmt.Errorf("seed auth: %w", err) + } + engine, err := auth.NewEngine(ctx, authStore) + if err != nil { + return nil, fmt.Errorf("create auth engine: %w", err) + } + + agents, err := store.ListEnabledAgents(ctx) + if err != nil || len(agents) == 0 { + return nil, fmt.Errorf("no enabled agents found") + } + defaultAgentID := agents[0].ID + + snap, err := store.Snapshot(ctx, defaultAgentID) + if err != nil { + return nil, fmt.Errorf("load config snapshot: %w", err) + } + + memoryEngine := memory.NewEngineFromDB(db, &memory.StaticSummarizer{Response: "compacted"}, memory.WithLogger(slog.Default())) + userMemoryStore := memory.NewUserMemoryStore(store) + sharedTools := []agenttool.Tool{ + memorytool.NewMemoryTool(memoryEngine, userMemoryStore), + } + + builtinNames := agenttool.BuiltinToolNames() + builtinNames = append(builtinNames, "delegate", "skills") + for _, t := range sharedTools { + builtinNames = append(builtinNames, t.Definition().Name) + } + + pm := pluginmgr.NewManager(slog.Default(), builtinNames) + pm.LoadAll(snap.Plugins) + for _, pt := range pm.Registry().Tools() { + sharedTools = append(sharedTools, pluginmgr.AdaptTool(pt)) + } + + idleTimeout := time.Duration(snap.Runner.IdleTimeout) * time.Minute + poolMgr := agent.NewPoolManager(store, memoryEngine, + agent.WithIdleTimeoutPM(idleTimeout), + agent.WithCompactionPM(agent.CompactionConfig{ + MaxTokens: snap.Runner.Compaction.MaxTokens, + KeepTail: snap.Runner.Compaction.KeepTail, + }.WithDefaults()), + agent.WithSharedExtraTools(sharedTools), + agent.WithPluginHooksPM(pm.Registry()), + ) + if err := poolMgr.StartAll(ctx); err != nil { + return nil, fmt.Errorf("start pool manager: %w", err) + } + + pool := poolMgr.Get(defaultAgentID) + if pool == nil { + pool = poolMgr.DefaultPool() + } + + listFn := func() []channelplugin.ModelOption { + return collectChannelModels(ctx, store, snap) + } + switchFn := func(provider, model string) error { + snap.Provider = provider + snap.Model = model + + if p, err := store.GetProvider(context.Background(), provider); err == nil { + snap.APIKey = p.APIKey + snap.BaseURL = p.BaseURL + } + + factory, err := agent.NewRunnerFactory(snap, sharedTools, pm.Registry()) + if err != nil { + return err + } + pool.SetFactory(factory) + pool.SetDefaultModel(model) + return nil + } + + linkCodes, err := auth.NewSharedLinkCodeStore(ctx, db) + if err != nil { + return nil, fmt.Errorf("create link code store: %w", err) + } + + return &channelRuntimeDeps{ + db: db, + store: store, + authStore: authStore, + engine: engine, + linkCodes: linkCodes, + poolManager: poolMgr, + pool: pool, + snap: snap, + listFn: listFn, + switchFn: switchFn, + }, nil +} + +func collectChannelModels(ctx context.Context, store config.Store, snap *config.Snapshot) []channelplugin.ModelOption { + seen := make(map[string]bool) + var models []channelplugin.ModelOption + + add := func(provider, model string) { + key := provider + "/" + model + if seen[key] { + return + } + seen[key] = true + models = append(models, channelplugin.ModelOption{Provider: provider, Model: model}) + } + + add(snap.Provider, snap.Model) + + if cache, err := config.LoadModelsCache(); err == nil { + for _, m := range cache.Models { + add(m.Provider, m.Model) + } + return models + } + + providers, err := store.ListProviders(ctx) + if err == nil { + for _, prov := range providers { + add(prov.ID, snap.Model) + } + } + + return models +} diff --git a/cmd/anna-plugin/main.go b/cmd/anna-plugin/main.go new file mode 100644 index 00000000..3a862ffd --- /dev/null +++ b/cmd/anna-plugin/main.go @@ -0,0 +1,97 @@ +package main + +import ( + "fmt" + "log/slog" + "os" + + ucli "github.com/urfave/cli/v2" + agenttool "github.com/vaayne/anna/internal/agent/tool" + "github.com/vaayne/anna/internal/pluginhost" +) + +func main() { + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + + app := &ucli.App{ + Name: "anna-plugin", + Usage: "Internal Anna plugin helper", + Commands: []*ucli.Command{ + toolCommand(), + channelCommand(), + }, + } + + if err := app.Run(os.Args); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func channelCommand() *ucli.Command { + return &ucli.Command{ + Name: "channel", + Usage: "Run a built-in channel plugin", + Flags: []ucli.Flag{ + &ucli.StringFlag{Name: "work-dir"}, + &ucli.StringFlag{Name: "user-data-dir"}, + }, + Action: func(c *ucli.Context) error { + name := c.Args().First() + if name == "" { + return fmt.Errorf("usage: anna-plugin channel ") + } + + workDir := c.String("work-dir") + if workDir == "" { + workDir = os.Getenv("ANNA_PLUGIN_WORKDIR") + } + userDataDir := c.String("user-data-dir") + if userDataDir == "" { + userDataDir = os.Getenv("ANNA_PLUGIN_USER_DATA_DIR") + } + + def, runtime, err := buildChannelPlugin(name, workDir, userDataDir) + if err != nil { + return err + } + + return pluginhost.ServeChannel(c.Context, def, runtime, os.Stdin, os.Stdout) + }, + } +} + +func toolCommand() *ucli.Command { + return &ucli.Command{ + Name: "tool", + Usage: "Run a built-in tool plugin", + Flags: []ucli.Flag{ + &ucli.StringFlag{Name: "work-dir"}, + &ucli.StringFlag{Name: "user-data-dir"}, + }, + Action: func(c *ucli.Context) error { + name := c.Args().First() + if name == "" { + return fmt.Errorf("usage: anna-plugin tool ") + } + + workDir := c.String("work-dir") + if workDir == "" { + workDir = os.Getenv("ANNA_PLUGIN_WORKDIR") + } + userDataDir := c.String("user-data-dir") + if userDataDir == "" { + userDataDir = os.Getenv("ANNA_PLUGIN_USER_DATA_DIR") + } + + def, runtime, err := agenttool.BuiltinToolPlugin(name, workDir, userDataDir) + if err != nil { + return err + } + + return pluginhost.ServeTool(c.Context, def, runtime, os.Stdin, os.Stdout) + }, + } +} diff --git a/cmd/anna/channel_plugins.go b/cmd/anna/channel_plugins.go new file mode 100644 index 00000000..88b6a0de --- /dev/null +++ b/cmd/anna/channel_plugins.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + + "github.com/vaayne/anna/internal/channel" + "github.com/vaayne/anna/internal/config" + "github.com/vaayne/anna/internal/pluginapi" + "github.com/vaayne/anna/internal/pluginhost" +) + +type pluginChannel struct { + name string + adapter *pluginhost.ChannelAdapter +} + +func (c *pluginChannel) Name() string { return c.name } + +func (c *pluginChannel) Start(ctx context.Context) error { return c.adapter.Start(ctx) } + +func (c *pluginChannel) Stop() { c.adapter.Stop() } + +func (c *pluginChannel) Notify(ctx context.Context, n channel.Notification) error { + return c.adapter.Notify(ctx, pluginapi.ChannelNotification{ + Channel: n.Channel, + ChatID: n.ChatID, + Text: n.Text, + Silent: n.Silent, + }) +} + +func loadRuntimeChannelCatalog(workDir, userDataDir string) (*pluginhost.Catalog, error) { + roots := []string{} + for _, root := range []string{config.BundledPluginsPath(), config.InstalledPluginsPath()} { + if _, err := os.Stat(root); err == nil { + roots = append(roots, root) + } + } + catalog, err := pluginhost.Discover(roots...) + if err != nil { + return nil, err + } + if err := catalog.Merge(channel.BuiltinChannelDefinitions(workDir, userDataDir)...); err != nil { + return nil, err + } + return catalog, nil +} + +func resolveChannelPluginDefinition(catalog *pluginhost.Catalog, bindings config.RuntimePluginBindings, name string) (pluginhost.Definition, error) { + id := bindings.ChannelBinding(name) + def, ok := catalog.Get(id) + if !ok { + return pluginhost.Definition{}, fmt.Errorf("channel %s bound to missing plugin %s", name, id) + } + if def.Manifest.Kind != pluginapi.KindChannel { + return pluginhost.Definition{}, fmt.Errorf("channel %s bound to non-channel plugin %s", name, id) + } + return def, nil +} + +func newChannelPlugin(name string, def pluginhost.Definition) channel.Channel { + return &pluginChannel{ + name: name, + adapter: pluginhost.NewChannelAdapter(def, pluginhost.SupervisorOptions{Logger: slog.Default().With("channel", name)}), + } +} diff --git a/cmd/anna/channel_plugins_test.go b/cmd/anna/channel_plugins_test.go new file mode 100644 index 00000000..d5a868f5 --- /dev/null +++ b/cmd/anna/channel_plugins_test.go @@ -0,0 +1,52 @@ +package main + +import ( + "testing" + + "github.com/vaayne/anna/internal/config" + "github.com/vaayne/anna/internal/pluginapi" + "github.com/vaayne/anna/internal/pluginhost" +) + +func TestResolveChannelPluginDefinitionUsesRuntimeBinding(t *testing.T) { + catalog := pluginhost.NewCatalog() + err := catalog.Add(pluginhost.Definition{ + Manifest: pluginapi.Manifest{ + Name: "replacement-telegram", + Version: "1.0.0", + Kind: pluginapi.KindChannel, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: pluginhost.BuiltinEntrypoint, + }, + }) + if err != nil { + t.Fatalf("catalog.Add: %v", err) + } + + bindings := config.DefaultRuntimePluginBindings() + bindings.Channels["telegram"] = "channel/replacement-telegram" + + def, err := resolveChannelPluginDefinition(catalog, bindings, "telegram") + if err != nil { + t.Fatalf("resolveChannelPluginDefinition: %v", err) + } + if got := def.ID(); got != "channel/replacement-telegram" { + t.Fatalf("resolveChannelPluginDefinition() = %q", got) + } +} + +func TestPluginChannelKeepsSlotName(t *testing.T) { + ch := newChannelPlugin("telegram", pluginhost.Definition{ + Manifest: pluginapi.Manifest{ + Name: "replacement-telegram", + Version: "1.0.0", + Kind: pluginapi.KindChannel, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: pluginhost.BuiltinEntrypoint, + }, + }) + + if got := ch.Name(); got != "telegram" { + t.Fatalf("Name() = %q, want slot name telegram", got) + } +} diff --git a/cmd/anna/commands.go b/cmd/anna/commands.go index 63787f2b..d7409c36 100644 --- a/cmd/anna/commands.go +++ b/cmd/anna/commands.go @@ -125,8 +125,7 @@ func setup(parent context.Context, gateway bool) (*setupResult, error) { ) // Collect built-in tool names for plugin collision detection. - builtinReg := agenttool.NewRegistry("") - builtinNames := builtinReg.BuiltinNames() + builtinNames := agenttool.BuiltinToolNames() builtinNames = append(builtinNames, "delegate", "skills") for _, t := range sharedTools { builtinNames = append(builtinNames, t.Definition().Name) diff --git a/cmd/anna/gateway.go b/cmd/anna/gateway.go index 6692e700..634219a2 100644 --- a/cmd/anna/gateway.go +++ b/cmd/anna/gateway.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "fmt" "log/slog" "net" @@ -19,10 +18,6 @@ import ( "github.com/vaayne/anna/internal/agent" "github.com/vaayne/anna/internal/auth" "github.com/vaayne/anna/internal/channel" - "github.com/vaayne/anna/internal/channel/feishu" - "github.com/vaayne/anna/internal/channel/qq" - "github.com/vaayne/anna/internal/channel/telegram" - "github.com/vaayne/anna/internal/channel/weixin" "github.com/vaayne/anna/internal/config" appdb "github.com/vaayne/anna/internal/db" "github.com/vaayne/anna/internal/scheduler" @@ -76,7 +71,10 @@ func runServer(ctx context.Context, s *setupResult, listFn channel.ModelListFunc } // Link codes are shared between admin panel and channel bots. - linkCodes := auth.NewLinkCodeStore() + linkCodes, err := auth.NewSharedLinkCodeStore(gctx, s.db) + if err != nil { + return fmt.Errorf("create link code store: %w", err) + } // Start admin panel server. if adminPort > 0 { @@ -108,97 +106,40 @@ func runServer(ctx context.Context, s *setupResult, listFn channel.ModelListFunc }) } - // Load channel configs from DB. - tgCfg := loadChannelConfig[telegramChannelConfig](s.store, "telegram") - qqCfg := loadChannelConfig[qqChannelConfig](s.store, "qq") - fsCfg := loadChannelConfig[feishuChannelConfig](s.store, "feishu") - wxCfg := loadChannelConfig[weixinChannelConfig](s.store, "weixin") - - // --- Telegram --- - if tgCfg != nil && tgCfg.Token != "" { - slog.Info("starting telegram bot") - - tgBot, err := telegram.New(telegram.Config{ - Token: tgCfg.Token, - ChannelID: tgCfg.ChannelID, - GroupMode: tgCfg.GroupMode, - }, s.poolManager, s.store, listFn, switchFn, - telegram.WithAuth(as, engine, linkCodes), - ) - if err != nil { - return fmt.Errorf("create telegram bot: %w", err) - } - - channels = append(channels, tgBot) - if tgCfg.EnableNotify { - s.notifier.Register(tgBot) - } + catalog, err := loadRuntimeChannelCatalog(s.snap.Workspace, config.AnnaHome()) + if err != nil { + return fmt.Errorf("load channel catalog: %w", err) } - // --- QQ --- - if qqCfg != nil && qqCfg.AppID != "" && qqCfg.AppSecret != "" { - slog.Info("starting qq bot") - - qqBot, err := qq.New(qq.Config{ - AppID: qqCfg.AppID, - AppSecret: qqCfg.AppSecret, - GroupMode: qqCfg.GroupMode, - }, s.poolManager, s.store, listFn, switchFn, - qq.WithAuth(as, engine, linkCodes), - ) - if err != nil { - return fmt.Errorf("create qq bot: %w", err) - } - - channels = append(channels, qqBot) - if qqCfg.EnableNotify { - s.notifier.Register(qqBot) - } + type channelSpec struct { + name string + notify bool } - // --- Feishu --- - if fsCfg != nil && fsCfg.AppID != "" && fsCfg.AppSecret != "" { - slog.Info("starting feishu bot") - - fsBot, err := feishu.New(feishu.Config{ - AppID: fsCfg.AppID, - AppSecret: fsCfg.AppSecret, - EncryptKey: fsCfg.EncryptKey, - VerificationToken: fsCfg.VerificationToken, - GroupMode: fsCfg.GroupMode, - Groups: fsCfg.Groups, - }, s.poolManager, s.store, listFn, switchFn, - feishu.WithAuth(as, engine, linkCodes), - ) - if err != nil { - return fmt.Errorf("create feishu bot: %w", err) - } - - channels = append(channels, fsBot) - if fsCfg.EnableNotify { - s.notifier.Register(fsBot) - } + specs := []channelSpec{} + if tgCfg := channel.LoadConfig[channel.TelegramConfig](s.store, "telegram"); tgCfg != nil && tgCfg.Token != "" { + specs = append(specs, channelSpec{name: "telegram", notify: tgCfg.EnableNotify}) + } + if qqCfg := channel.LoadConfig[channel.QQConfig](s.store, "qq"); qqCfg != nil && qqCfg.AppID != "" && qqCfg.AppSecret != "" { + specs = append(specs, channelSpec{name: "qq", notify: qqCfg.EnableNotify}) + } + if fsCfg := channel.LoadConfig[channel.FeishuConfig](s.store, "feishu"); fsCfg != nil && fsCfg.AppID != "" && fsCfg.AppSecret != "" { + specs = append(specs, channelSpec{name: "feishu", notify: fsCfg.EnableNotify}) + } + if wxCfg := channel.LoadConfig[channel.WeixinConfig](s.store, "weixin"); wxCfg != nil && wxCfg.BotToken != "" { + specs = append(specs, channelSpec{name: "weixin", notify: wxCfg.EnableNotify}) } - // --- Weixin --- - if wxCfg != nil && wxCfg.BotToken != "" { - slog.Info("starting weixin bot") - - wxBot, err := weixin.New(weixin.Config{ - BotToken: wxCfg.BotToken, - BaseURL: wxCfg.BaseURL, - BotID: wxCfg.BotID, - UserID: wxCfg.UserID, - }, s.poolManager, s.store, listFn, switchFn, - weixin.WithAuth(as, engine, linkCodes), - ) + for _, spec := range specs { + def, err := resolveChannelPluginDefinition(catalog, s.snap.RuntimePlugins, spec.name) if err != nil { - return fmt.Errorf("create weixin bot: %w", err) + return err } - - channels = append(channels, wxBot) - if wxCfg.EnableNotify { - s.notifier.Register(wxBot) + slog.Info("starting channel plugin", "channel", spec.name, "plugin", def.ID()) + ch := newChannelPlugin(spec.name, def) + channels = append(channels, ch) + if spec.notify { + s.notifier.Register(ch) } } @@ -315,52 +256,3 @@ func launchBrowser(url string) { go func() { _ = cmd.Wait() }() } } - -// --- Channel config types for JSON deserialization --- - -type telegramChannelConfig struct { - Token string `json:"token"` - ChannelID string `json:"channel_id"` - GroupMode string `json:"group_mode"` - EnableNotify bool `json:"enable_notify"` -} - -type qqChannelConfig struct { - AppID string `json:"app_id"` - AppSecret string `json:"app_secret"` - GroupMode string `json:"group_mode"` - EnableNotify bool `json:"enable_notify"` -} - -type feishuChannelConfig struct { - AppID string `json:"app_id"` - AppSecret string `json:"app_secret"` - EncryptKey string `json:"encrypt_key"` - VerificationToken string `json:"verification_token"` - GroupMode string `json:"group_mode"` - Groups map[string]feishu.GroupConfig `json:"groups"` - EnableNotify bool `json:"enable_notify"` -} - -type weixinChannelConfig struct { - BotToken string `json:"bot_token"` - BaseURL string `json:"base_url"` - BotID string `json:"bot_id"` - UserID string `json:"user_id"` - EnableNotify bool `json:"enable_notify"` -} - -// loadChannelConfig loads a channel's JSON config from the store and -// deserializes it into the given type. Returns nil if not found. -func loadChannelConfig[T any](store config.Store, channelID string) *T { - ch, err := store.GetChannel(context.Background(), channelID) - if err != nil { - return nil - } - var cfg T - if err := json.Unmarshal([]byte(ch.Config), &cfg); err != nil { - slog.Warn("failed to parse channel config", "channel", channelID, "error", err) - return nil - } - return &cfg -} diff --git a/cmd/anna/plugin.go b/cmd/anna/plugin.go index 0afb6083..abce074f 100644 --- a/cmd/anna/plugin.go +++ b/cmd/anna/plugin.go @@ -21,6 +21,7 @@ func pluginCommand() *ucli.Command { pluginListCommand(), pluginAddCommand(), pluginRemoveCommand(), + runtimePluginCommand(), }, Action: func(c *ucli.Context) error { return pluginListAction() @@ -50,7 +51,7 @@ func pluginListAction() error { } if len(plugins) == 0 { - fmt.Println("No plugins configured.") + fmt.Println("No JavaScript plugins configured.") return nil } @@ -65,7 +66,7 @@ func pluginListAction() error { func pluginAddCommand() *ucli.Command { return &ucli.Command{ Name: "add", - Usage: "Add a JS plugin", + Usage: "Add a JavaScript plugin", ArgsUsage: "", Flags: []ucli.Flag{ &ucli.StringSliceFlag{ @@ -131,7 +132,7 @@ func pluginRemoveCommand() *ucli.Command { return &ucli.Command{ Name: "remove", Aliases: []string{"rm"}, - Usage: "Remove a plugin by name or path", + Usage: "Remove a JavaScript plugin by name or path", ArgsUsage: "", Action: func(c *ucli.Context) error { target := c.Args().First() diff --git a/cmd/anna/plugin_runtime_cli.go b/cmd/anna/plugin_runtime_cli.go new file mode 100644 index 00000000..008337a0 --- /dev/null +++ b/cmd/anna/plugin_runtime_cli.go @@ -0,0 +1,224 @@ +package main + +import ( + "context" + "fmt" + + ucli "github.com/urfave/cli/v2" + agenttool "github.com/vaayne/anna/internal/agent/tool" + "github.com/vaayne/anna/internal/channel" + "github.com/vaayne/anna/internal/config" + "github.com/vaayne/anna/internal/pluginhost" +) + +func runtimePluginCommand() *ucli.Command { + return &ucli.Command{ + Name: "runtime", + Usage: "Manage subprocess runtime plugins", + Subcommands: []*ucli.Command{ + runtimePluginListCommand(), + runtimePluginBindCommand(), + }, + Action: func(c *ucli.Context) error { + return runtimePluginListAction(c) + }, + } +} + +func runtimePluginListCommand() *ucli.Command { + return &ucli.Command{ + Name: "list", + Usage: "List effective runtime plugin bindings for tools and channels", + Action: runtimePluginListAction, + } +} + +func runtimePluginListAction(c *ucli.Context) error { + store, err := openStore() + if err != nil { + return err + } + snap, err := defaultSnapshot(c.Context, store) + if err != nil { + return err + } + bindings, err := config.LoadRuntimePluginBindings(store) + if err != nil { + return err + } + + toolCatalog, err := agenttool.LoadCatalog(snap.Workspace, config.AnnaHome()) + if err != nil { + return err + } + channelCatalog, err := loadRuntimeChannelCatalog(snap.Workspace, config.AnnaHome()) + if err != nil { + return err + } + + fmt.Println("Runtime tool bindings:") + fmt.Printf("%-10s %-24s %-10s %s\n", "SLOT", "PLUGIN", "SOURCE", "VERSION") + for _, name := range agenttool.BuiltinToolNames() { + id := bindings.ToolBinding(name) + def, _ := toolCatalog.Get(id) + fmt.Printf("%-10s %-24s %-10s %s\n", name, id, runtimePluginSource(def), def.Manifest.Version) + } + + fmt.Println() + fmt.Println("Runtime channel bindings:") + fmt.Printf("%-10s %-24s %-10s %-8s %s\n", "SLOT", "PLUGIN", "SOURCE", "ENABLED", "VERSION") + channelEnabled, err := enabledChannels(c.Context, store) + if err != nil { + return err + } + for _, name := range channel.BuiltinChannelNames() { + id := bindings.ChannelBinding(name) + def, _ := channelCatalog.Get(id) + fmt.Printf("%-10s %-24s %-10s %-8s %s\n", name, id, runtimePluginSource(def), yesNo(channelEnabled[name]), def.Manifest.Version) + } + + fmt.Printf("\nLogs: %s\n", config.AnnaHome()+"/anna.log") + return nil +} + +func runtimePluginBindCommand() *ucli.Command { + return &ucli.Command{ + Name: "bind", + Usage: "Bind a tool or channel slot to a runtime plugin ID", + ArgsUsage: " [plugin-id]", + Flags: []ucli.Flag{ + &ucli.BoolFlag{ + Name: "default", + Usage: "Reset the slot back to its default bundled plugin", + }, + }, + Action: runtimePluginBindAction, + } +} + +func runtimePluginBindAction(c *ucli.Context) error { + kind := c.Args().Get(0) + slot := c.Args().Get(1) + pluginID := c.Args().Get(2) + if kind == "" || slot == "" { + return fmt.Errorf("usage: anna plugin runtime bind [plugin-id]") + } + + store, err := openStore() + if err != nil { + return err + } + snap, err := defaultSnapshot(c.Context, store) + if err != nil { + return err + } + bindings, err := config.LoadRuntimePluginBindings(store) + if err != nil { + return err + } + + switch kind { + case "tool": + if !contains(agenttool.BuiltinToolNames(), slot) { + return fmt.Errorf("unknown tool slot %q", slot) + } + if c.Bool("default") { + pluginID = config.DefaultRuntimePluginBindings().ToolBinding(slot) + } + if pluginID == "" { + return fmt.Errorf("plugin id is required unless --default is set") + } + catalog, err := agenttool.LoadCatalog(snap.Workspace, config.AnnaHome()) + if err != nil { + return err + } + if err := validateRuntimeBinding(catalog, pluginID, "tool"); err != nil { + return err + } + if bindings.Tools == nil { + bindings.Tools = map[string]string{} + } + bindings.Tools[slot] = pluginID + + case "channel": + if !contains(channel.BuiltinChannelNames(), slot) { + return fmt.Errorf("unknown channel slot %q", slot) + } + if c.Bool("default") { + pluginID = config.DefaultRuntimePluginBindings().ChannelBinding(slot) + } + if pluginID == "" { + return fmt.Errorf("plugin id is required unless --default is set") + } + catalog, err := loadRuntimeChannelCatalog(snap.Workspace, config.AnnaHome()) + if err != nil { + return err + } + if err := validateRuntimeBinding(catalog, pluginID, "channel"); err != nil { + return err + } + if bindings.Channels == nil { + bindings.Channels = map[string]string{} + } + bindings.Channels[slot] = pluginID + + default: + return fmt.Errorf("kind must be tool or channel") + } + + if err := config.SaveRuntimePluginBindings(store, bindings); err != nil { + return err + } + + fmt.Printf("Bound %s %q to %s.\n", kind, slot, pluginID) + return nil +} + +func validateRuntimeBinding(catalog *pluginhost.Catalog, pluginID, kind string) error { + def, ok := catalog.Get(pluginID) + if !ok { + return fmt.Errorf("runtime plugin %q not found", pluginID) + } + if string(def.Manifest.Kind) != kind { + return fmt.Errorf("runtime plugin %q has kind %q, want %q", pluginID, def.Manifest.Kind, kind) + } + return nil +} + +func enabledChannels(ctx context.Context, store config.Store) (map[string]bool, error) { + rows, err := store.ListChannels(ctx) + if err != nil { + return nil, err + } + out := make(map[string]bool, len(rows)) + for _, ch := range rows { + out[ch.ID] = ch.Enabled + } + return out, nil +} + +func runtimePluginSource(def pluginhost.Definition) string { + if def.Manifest.Entrypoint == pluginhost.BuiltinEntrypoint { + return "bundled" + } + if def.ManifestPath == "" { + return "runtime" + } + return "installed" +} + +func contains(items []string, target string) bool { + for _, item := range items { + if item == target { + return true + } + } + return false +} + +func yesNo(v bool) string { + if v { + return "yes" + } + return "no" +} diff --git a/cmd/anna/plugin_runtime_test.go b/cmd/anna/plugin_runtime_test.go new file mode 100644 index 00000000..5a7d8f1f --- /dev/null +++ b/cmd/anna/plugin_runtime_test.go @@ -0,0 +1,177 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "testing" + + agenttool "github.com/vaayne/anna/internal/agent/tool" + "github.com/vaayne/anna/internal/config" +) + +var ( + annaPluginBinaryOnce sync.Once + annaPluginBinaryPath string + annaPluginBinaryErr error +) + +func buildAnnaPluginBinary(t *testing.T) string { + t.Helper() + + annaPluginBinaryOnce.Do(func() { + root, err := filepath.Abs(filepath.Join("..", "..")) + if err != nil { + annaPluginBinaryErr = err + return + } + dir, err := os.MkdirTemp("", "anna-plugin-bin-") + if err != nil { + annaPluginBinaryErr = err + return + } + binPath := filepath.Join(dir, "anna-plugin-test-bin") + cmd := exec.Command("go", "build", "-o", binPath, "./cmd/anna-plugin") + cmd.Dir = root + cmd.Env = os.Environ() + out, err := cmd.CombinedOutput() + if err != nil { + annaPluginBinaryErr = fmt.Errorf("build anna-plugin binary: %w: %s", err, string(out)) + return + } + annaPluginBinaryPath = binPath + }) + if annaPluginBinaryErr != nil { + t.Fatal(annaPluginBinaryErr) + } + return annaPluginBinaryPath +} + +func TestPluginToolRegistryExecuteReadWriteEdit(t *testing.T) { + t.Setenv("ANNA_HOME", t.TempDir()) + config.ResetAnnaHome() + t.Cleanup(config.ResetAnnaHome) + t.Setenv("ANNA_PLUGIN_ENTRYPOINT", buildAnnaPluginBinary(t)) + + dir := t.TempDir() + path := filepath.Join(dir, "note.txt") + if err := os.WriteFile(path, []byte("hello world"), 0o644); err != nil { + t.Fatal(err) + } + + reg := agenttool.NewRegistry("") + defer func() { _ = reg.Close() }() + + readResult, err := reg.Execute(context.Background(), "read", map[string]any{"file_path": path}) + if err != nil { + t.Fatalf("read execute: %v", err) + } + if !strings.Contains(readResult, "hello world") { + t.Fatalf("read result = %q, want file content", readResult) + } + + editResult, err := reg.Execute(context.Background(), "edit", map[string]any{ + "file_path": path, + "old_string": "world", + "new_string": "anna", + }) + if err != nil { + t.Fatalf("edit execute: %v", err) + } + if editResult == "" { + t.Fatal("expected non-empty edit result") + } + + updated, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + if string(updated) != "hello anna" { + t.Fatalf("updated file = %q, want %q", string(updated), "hello anna") + } + + writePath := filepath.Join(dir, "write.txt") + writeResult, err := reg.Execute(context.Background(), "write", map[string]any{ + "file_path": writePath, + "content": "from plugin", + }) + if err != nil { + t.Fatalf("write execute: %v", err) + } + if writeResult == "" { + t.Fatal("expected non-empty write result") + } + + written, err := os.ReadFile(writePath) + if err != nil { + t.Fatal(err) + } + if string(written) != "from plugin" { + t.Fatalf("write file = %q, want %q", string(written), "from plugin") + } +} + +func TestPluginToolRegistryExecuteBashAndWebFetch(t *testing.T) { + t.Setenv("ANNA_HOME", t.TempDir()) + config.ResetAnnaHome() + t.Cleanup(config.ResetAnnaHome) + t.Setenv("ANNA_PLUGIN_ENTRYPOINT", buildAnnaPluginBinary(t)) + + workDir := t.TempDir() + reg := agenttool.NewRegistry(workDir) + defer func() { _ = reg.Close() }() + + bashResult, err := reg.Execute(context.Background(), "bash", map[string]any{"command": "pwd -P"}) + if err != nil { + t.Fatalf("bash execute: %v", err) + } + if !strings.Contains(bashResult, workDir) { + t.Fatalf("bash result = %q, want work dir %q", bashResult, workDir) + } + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html") + _, _ = w.Write([]byte("

Plugin Fetch

ok

")) + })) + defer srv.Close() + + fetchResult, err := reg.Execute(context.Background(), "webfetch", map[string]any{ + "url": srv.URL, + "format": "text", + }) + if err != nil { + t.Fatalf("webfetch execute: %v", err) + } + if !strings.Contains(fetchResult, "Plugin Fetch") { + t.Fatalf("webfetch result = %q, want fetched content", fetchResult) + } +} + +func TestPluginToolRegistrySandbox(t *testing.T) { + t.Setenv("ANNA_HOME", t.TempDir()) + config.ResetAnnaHome() + t.Cleanup(config.ResetAnnaHome) + t.Setenv("ANNA_PLUGIN_ENTRYPOINT", buildAnnaPluginBinary(t)) + + allowed := t.TempDir() + outside := t.TempDir() + + reg := agenttool.NewRegistry("", allowed) + defer func() { _ = reg.Close() }() + + _, err := reg.Execute(context.Background(), "read", map[string]any{ + "file_path": filepath.Join(outside, "secret.txt"), + }) + if err == nil { + t.Fatal("expected sandbox error") + } + if !strings.Contains(err.Error(), "sandbox") { + t.Fatalf("sandbox error = %v, want sandbox in error", err) + } +} diff --git a/docs/content/docs/features/plugin-system.md b/docs/content/docs/features/plugin-system.md index a4f8c8ea..c99b8932 100644 --- a/docs/content/docs/features/plugin-system.md +++ b/docs/content/docs/features/plugin-system.md @@ -4,7 +4,23 @@ title: Plugin System ## Overview -Anna supports JavaScript plugins that extend the assistant with custom tools and lifecycle hooks. Plugins run inside an embedded [QuickJS](https://bellard.org/quickjs/) runtime -- no external Node.js or npm required. +Anna now has two plugin paths: + +- **JavaScript plugins** for lightweight tools and lifecycle hooks +- **Runtime plugins** for subprocess-based tools and channels + +JavaScript plugins run inside an embedded [QuickJS](https://bellard.org/quickjs/) runtime -- no external Node.js or npm required. + +Runtime plugins run as separate processes and communicate with anna over a versioned stdio protocol. The first built-in runtime plugin targets are the core tools (`read`, `bash`, `edit`, `write`, `webfetch`) and the network channels (`telegram`, `qq`, `feishu`, `weixin`). + +Runtime bindings are slot-based. Rebinding `tool/read` or the `telegram` channel slot changes the implementation behind that slot without changing the rest of Anna's internal routing or stored channel configuration. + +The two systems are intentionally separate: + +- The `plugins` setting still stores JavaScript plugin entries. +- The `runtime_plugins` setting stores slot-to-plugin bindings for subprocess tools and channels. +- `anna plugin ...` manages JavaScript plugins. +- `anna plugin runtime ...` manages subprocess plugin bindings. A plugin is a single `.js` file that receives an `anna` host object and uses it to register tools, subscribe to lifecycle events, and access host APIs. @@ -44,10 +60,16 @@ anna plugin list # List configured plugins anna plugin add # Add a plugin anna plugin add --config key=val # Add with config values (repeatable) anna plugin remove # Remove a plugin (alias: rm) +anna plugin runtime list # List runtime tool/channel bindings +anna plugin runtime bind tool read tool/read +anna plugin runtime bind channel telegram channel/telegram +anna plugin runtime bind tool read --default ``` The `add` command writes the plugin entry into the `settings` table in the database (under the `"plugins"` key). The `remove` command accepts either the plugin name (filename without `.js`) or the full path. Both commands update the `settings` table directly. +`anna plugin runtime list` shows the effective subprocess plugin bindings for tool and channel slots, along with the resolved source and whether a channel is enabled. `anna plugin runtime bind ...` updates the `runtime_plugins` setting and lets you point a slot at a different runtime plugin ID. + ## Configuration Plugins are stored in the `settings` table under the key `"plugins"` as a JSON array. Each entry has: @@ -74,6 +96,23 @@ Example JSON structure stored in the settings table: Use the `anna plugin add` and `anna plugin remove` CLI commands to manage this list, or edit it through the admin panel. +### Runtime Plugin Bindings + +Subprocess plugin bindings live in the `settings` table under the key `"runtime_plugins"`: + +```json +{ + "tools": { + "read": "tool/read" + }, + "channels": { + "telegram": "channel/telegram" + } +} +``` + +Each tool or channel slot resolves to a runtime plugin ID. If a slot has no explicit override, anna falls back to the bundled plugin for that slot. + ## Writing Plugins A plugin file is executed inside an IIFE that receives the `anna` host object. All registration happens at load time -- there is no module system or `require`. @@ -202,6 +241,13 @@ Plugins run in a sandboxed QuickJS runtime with these restrictions: - **Concurrency**: All JS calls are serialized with a mutex since QuickJS is single-threaded. - **Tool name isolation**: Plugin tools cannot shadow built-in tools. +Runtime plugins have a different isolation model: + +- They run out of process, so a crash does not crash the main anna daemon. +- They are supervised and restarted by the host when appropriate. +- Their stderr is forwarded into anna's structured logs. +- Built-in channels and tools now use this path first, which makes later replacement possible without recompiling anna. + ## Examples ### Lifecycle Logger diff --git a/docs/content/docs/getting-started/configuration.md b/docs/content/docs/getting-started/configuration.md index 2e4dddc2..671e279a 100644 --- a/docs/content/docs/getting-started/configuration.md +++ b/docs/content/docs/getting-started/configuration.md @@ -20,6 +20,7 @@ A key-value store for global settings. Each row has a `key` (text) and a `value` | `compaction` | Compaction thresholds (max_tokens, keep_tail) | | `heartbeat` | Heartbeat polling toggle and interval | | `plugins` | JSON array of plugin definitions | +| `runtime_plugins` | JSON object of subprocess tool/channel bindings | | `models_cache` | Cached model list from providers | ### settings_providers @@ -145,6 +146,8 @@ Each platform stores its own JSON structure in the `config` column of `settings_ | Path | Purpose | Category | | -------------------------------------------- | ------------------------------------------- | -------- | | `~/.anna/anna.db` | SQLite database (config, memory, scheduler) | Data | +| `~/.anna/plugins/bundled/` | Bundled runtime plugin manifests | Data | +| `~/.anna/plugins/installed/` | User-installed runtime plugins | Data | | `~/.anna/workspaces/{agent-id}/skills/` | Per-agent installed skills | Data | | `~/.anna/workspaces/{agent-id}/anna.log` | Per-agent log file | Data | | `~/.anna/workspaces/{agent-id}/SOUL.md` | Optional soul/identity override | Data | @@ -192,3 +195,22 @@ Plugins are stored in the `settings` table under the `plugins` key as a JSON arr { "path": "/abs/path/notify.js", "config": { "webhook_url": "https://example.com" } } ] ``` + +## Runtime Plugin Bindings + +Subprocess plugin bindings are stored separately under the `runtime_plugins` key: + +```json +{ + "tools": { + "read": "tool/read", + "bash": "tool/bash" + }, + "channels": { + "telegram": "channel/telegram", + "qq": "channel/qq" + } +} +``` + +These bindings control which runtime plugin ID handles each built-in tool or channel slot. If a slot has no explicit override, anna falls back to the bundled first-party plugin for that slot. Use `anna plugin runtime list` to inspect the effective bindings and `anna plugin runtime bind ...` to change them. diff --git a/internal/agent/factory.go b/internal/agent/factory.go index f0efd444..21d0d843 100644 --- a/internal/agent/factory.go +++ b/internal/agent/factory.go @@ -61,17 +61,18 @@ func NewRunnerFactory(snap *config.Snapshot, extraTools []agenttool.Tool, plugin workDir := userDataDir return runner.NewGoRunner(ctx, runner.GoRunnerConfig{ - API: provID, - Model: modelID, - APIKey: creds.APIKey, - Workspace: snap.Workspace, - AnnaHome: config.AnnaHome(), - BaseURL: creds.BaseURL, - System: system, - ExtraTools: sessionTools, - PluginHooks: pluginHooks, - WorkDir: workDir, - UserDataDir: userDataDir, + API: provID, + Model: modelID, + APIKey: creds.APIKey, + Workspace: snap.Workspace, + AnnaHome: config.AnnaHome(), + BaseURL: creds.BaseURL, + System: system, + ExtraTools: sessionTools, + PluginHooks: pluginHooks, + WorkDir: workDir, + UserDataDir: userDataDir, + RuntimePlugins: snap.RuntimePlugins, }) }, nil default: diff --git a/internal/agent/plugin_entrypoint_test.go b/internal/agent/plugin_entrypoint_test.go new file mode 100644 index 00000000..f91537ea --- /dev/null +++ b/internal/agent/plugin_entrypoint_test.go @@ -0,0 +1,54 @@ +package agent + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "sync" + "testing" +) + +var ( + pluginBinaryOnce sync.Once + pluginBinaryPath string + pluginBinaryErr error +) + +func TestMain(m *testing.M) { + os.Exit(runTestsWithPluginBinary(m, filepath.Join("..", ".."))) +} + +func runTestsWithPluginBinary(m *testing.M, rootRel string) int { + pluginBinaryOnce.Do(func() { + root, err := filepath.Abs(rootRel) + if err != nil { + pluginBinaryErr = err + return + } + dir, err := os.MkdirTemp("", "anna-plugin-bin-") + if err != nil { + pluginBinaryErr = err + return + } + binPath := filepath.Join(dir, "anna-plugin-test-bin") + cmd := exec.Command("go", "build", "-o", binPath, "./cmd/anna-plugin") + cmd.Dir = root + cmd.Env = os.Environ() + out, err := cmd.CombinedOutput() + if err != nil { + pluginBinaryErr = fmt.Errorf("build anna binary: %w: %s", err, string(out)) + return + } + pluginBinaryPath = binPath + }) + if pluginBinaryErr != nil { + fmt.Fprintln(os.Stderr, pluginBinaryErr) + return 1 + } + if err := os.Setenv("ANNA_PLUGIN_ENTRYPOINT", pluginBinaryPath); err != nil { + fmt.Fprintln(os.Stderr, err) + return 1 + } + return m.Run() +} diff --git a/internal/agent/runner/gorunner.go b/internal/agent/runner/gorunner.go index c7c64ef7..72dcc99d 100644 --- a/internal/agent/runner/gorunner.go +++ b/internal/agent/runner/gorunner.go @@ -14,23 +14,25 @@ import ( "github.com/vaayne/anna/internal/ai/providers/anthropic" "github.com/vaayne/anna/internal/ai/providers/openai" openairesponse "github.com/vaayne/anna/internal/ai/providers/openai-response" + "github.com/vaayne/anna/internal/config" ) const maxToolIterations = 40 // GoRunnerConfig configures the Go runner. type GoRunnerConfig struct { - API string // provider key: "anthropic", "openai" - Model string // e.g. "claude-sonnet-4-20250514" - APIKey string - BaseURL string // optional provider base URL override - WorkDir string // working directory for tool execution - Workspace string // workspace dir for skills/memory (e.g. ~/.anna/workspace) - AnnaHome string // anna home directory (e.g. ~/.anna) - System string // optional system prompt override (bypasses default prompt building) - ExtraTools []tool.Tool // additional tools to register - PluginHooks engine.PluginHookRunner // optional plugin lifecycle hooks - UserDataDir string // per-user data directory for sandbox enforcement (empty = no sandbox) + API string // provider key: "anthropic", "openai" + Model string // e.g. "claude-sonnet-4-20250514" + APIKey string + BaseURL string // optional provider base URL override + WorkDir string // working directory for tool execution + Workspace string // workspace dir for skills/memory (e.g. ~/.anna/workspace) + AnnaHome string // anna home directory (e.g. ~/.anna) + System string // optional system prompt override (bypasses default prompt building) + ExtraTools []tool.Tool // additional tools to register + PluginHooks engine.PluginHookRunner // optional plugin lifecycle hooks + UserDataDir string // per-user data directory for sandbox enforcement (empty = no sandbox) + RuntimePlugins config.RuntimePluginBindings } // GoRunner implements Runner by calling LLM providers directly via Engine. @@ -77,7 +79,10 @@ func NewGoRunner(_ context.Context, cfg GoRunnerConfig) (*GoRunner, error) { eng := &engine.Engine{Providers: reg} model := ai.Model{API: cfg.API, Name: cfg.Model} - tools := tool.NewRegistry(cfg.WorkDir, cfg.UserDataDir) + tools, err := tool.NewRegistryWithBindings(cfg.WorkDir, cfg.RuntimePlugins, cfg.UserDataDir) + if err != nil { + return nil, fmt.Errorf("configure runtime tool plugins: %w", err) + } for _, t := range cfg.ExtraTools { tools.Register(t) } @@ -150,8 +155,13 @@ func (r *GoRunner) LastActivity() time.Time { return r.lastActivity } -// Close is a no-op for the Go runner. -func (r *GoRunner) Close() error { return nil } +// Close shuts down any subprocess-backed tools owned by the runner. +func (r *GoRunner) Close() error { + if r.tools != nil { + _ = r.tools.Close() + } + return nil +} // buildToolSet adapts tool.Registry to engine.ToolSet for Engine. func (r *GoRunner) buildToolSet() engine.ToolSet { diff --git a/internal/agent/runner/plugin_entrypoint_test.go b/internal/agent/runner/plugin_entrypoint_test.go new file mode 100644 index 00000000..796e06b9 --- /dev/null +++ b/internal/agent/runner/plugin_entrypoint_test.go @@ -0,0 +1,50 @@ +package runner + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "sync" + "testing" +) + +var ( + pluginBinaryOnce sync.Once + pluginBinaryPath string + pluginBinaryErr error +) + +func TestMain(m *testing.M) { + pluginBinaryOnce.Do(func() { + root, err := filepath.Abs(filepath.Join("..", "..", "..")) + if err != nil { + pluginBinaryErr = err + return + } + dir, err := os.MkdirTemp("", "anna-plugin-bin-") + if err != nil { + pluginBinaryErr = err + return + } + binPath := filepath.Join(dir, "anna-plugin-test-bin") + cmd := exec.Command("go", "build", "-o", binPath, "./cmd/anna-plugin") + cmd.Dir = root + cmd.Env = os.Environ() + out, err := cmd.CombinedOutput() + if err != nil { + pluginBinaryErr = fmt.Errorf("build anna binary: %w: %s", err, string(out)) + return + } + pluginBinaryPath = binPath + }) + if pluginBinaryErr != nil { + fmt.Fprintln(os.Stderr, pluginBinaryErr) + os.Exit(1) + } + if err := os.Setenv("ANNA_PLUGIN_ENTRYPOINT", pluginBinaryPath); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + os.Exit(m.Run()) +} diff --git a/internal/agent/tool/plugin_entrypoint_test.go b/internal/agent/tool/plugin_entrypoint_test.go new file mode 100644 index 00000000..7ee199f2 --- /dev/null +++ b/internal/agent/tool/plugin_entrypoint_test.go @@ -0,0 +1,50 @@ +package tool + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "sync" + "testing" +) + +var ( + pluginBinaryOnce sync.Once + pluginBinaryPath string + pluginBinaryErr error +) + +func TestMain(m *testing.M) { + pluginBinaryOnce.Do(func() { + root, err := filepath.Abs(filepath.Join("..", "..", "..")) + if err != nil { + pluginBinaryErr = err + return + } + dir, err := os.MkdirTemp("", "anna-plugin-bin-") + if err != nil { + pluginBinaryErr = err + return + } + binPath := filepath.Join(dir, "anna-plugin-test-bin") + cmd := exec.Command("go", "build", "-o", binPath, "./cmd/anna-plugin") + cmd.Dir = root + cmd.Env = os.Environ() + out, err := cmd.CombinedOutput() + if err != nil { + pluginBinaryErr = fmt.Errorf("build anna binary: %w: %s", err, string(out)) + return + } + pluginBinaryPath = binPath + }) + if pluginBinaryErr != nil { + fmt.Fprintln(os.Stderr, pluginBinaryErr) + os.Exit(1) + } + if err := os.Setenv("ANNA_PLUGIN_ENTRYPOINT", pluginBinaryPath); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + os.Exit(m.Run()) +} diff --git a/internal/agent/tool/plugin_runtime.go b/internal/agent/tool/plugin_runtime.go new file mode 100644 index 00000000..9f0310b0 --- /dev/null +++ b/internal/agent/tool/plugin_runtime.go @@ -0,0 +1,109 @@ +package tool + +import ( + "fmt" + "os" + + "github.com/vaayne/anna/internal/pluginapi" + "github.com/vaayne/anna/internal/pluginhost" + "github.com/vaayne/anna/internal/toolspec" +) + +const builtinPluginVersion = "1.0.0" + +var builtinToolNames = []string{"read", "bash", "edit", "write", "webfetch"} + +func BuiltinToolNames() []string { + names := make([]string, len(builtinToolNames)) + copy(names, builtinToolNames) + return names +} + +func BuiltinToolDefinitions(workDir, userDataDir string) []pluginhost.Definition { + defs := make([]pluginhost.Definition, 0, len(builtinToolNames)) + for _, name := range builtinToolNames { + def, _, err := BuiltinToolPlugin(name, workDir, userDataDir) + if err != nil { + continue + } + defs = append(defs, def) + } + return defs +} + +// BuiltinToolPlugin builds both the subprocess manifest and the local runtime +// tool for a built-in tool name. +func BuiltinToolPlugin(name, workDir, userDataDir string) (pluginhost.Definition, Tool, error) { + cwd, err := os.Getwd() + if err != nil { + cwd = "" + } + var runtime Tool + var def toolspec.Definition + var sandbox string + bashDir := workDir + if userDataDir != "" { + sandbox = userDataDir + bashDir = userDataDir + } + + switch name { + case "read": + runtime = wrapWithSandbox(&ReadTool{}, sandbox, "file_path") + def = runtime.Definition() + case "bash": + runtime = &BashTool{workDir: bashDir} + def = runtime.Definition() + case "edit": + runtime = wrapWithSandbox(&EditTool{}, sandbox, "file_path") + def = runtime.Definition() + case "write": + runtime = wrapWithSandbox(&WriteTool{}, sandbox, "file_path") + def = runtime.Definition() + case "webfetch": + runtime = NewWebFetchTool() + def = runtime.Definition() + default: + return pluginhost.Definition{}, nil, fmt.Errorf("unknown builtin tool plugin: %s", name) + } + + manifest := pluginapi.Manifest{ + Name: name, + Version: builtinPluginVersion, + Kind: pluginapi.KindTool, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: pluginhost.BuiltinEntrypoint, + Args: []string{ + "tool", + name, + "--work-dir", + workDir, + }, + Capabilities: []pluginapi.Capability{ + pluginapi.CapabilityToolCall, + pluginapi.CapabilityHealthCheck, + pluginapi.CapabilityGracefulShutdown, + }, + Tool: toolSpecFrom(def), + Metadata: map[string]any{ + "work_dir": workDir, + "user_data_dir": userDataDir, + }, + } + if userDataDir != "" { + manifest.Args = append(manifest.Args, "--user-data-dir", userDataDir) + } + + return pluginhost.Definition{ + Manifest: manifest, + RootDir: cwd, + }, runtime, nil +} + +func toolSpecFrom(def toolspec.Definition) *pluginapi.ToolSpec { + return &pluginapi.ToolSpec{ + Name: def.Name, + Description: def.Description, + InputSchema: def.InputSchema, + } +} diff --git a/internal/agent/tool/plugin_tool.go b/internal/agent/tool/plugin_tool.go new file mode 100644 index 00000000..af628dda --- /dev/null +++ b/internal/agent/tool/plugin_tool.go @@ -0,0 +1,63 @@ +package tool + +import ( + "context" + "fmt" + "log/slog" + + "github.com/vaayne/anna/internal/pluginapi" + "github.com/vaayne/anna/internal/pluginhost" + "github.com/vaayne/anna/internal/toolspec" +) + +type pluginTool struct { + def pluginhost.Definition + supervisor *pluginhost.Supervisor +} + +func newPluginTool(def pluginhost.Definition) Tool { + return &pluginTool{ + def: def, + supervisor: pluginhost.NewSupervisor(def, pluginhost.SupervisorOptions{Logger: slog.Default()}), + } +} + +func (t *pluginTool) Definition() toolspec.Definition { + if t.def.Manifest.Tool != nil { + return toolspec.Definition{ + Name: t.def.Manifest.Tool.Name, + Description: t.def.Manifest.Tool.Description, + InputSchema: t.def.Manifest.Tool.InputSchema, + } + } + return toolspec.Definition{ + Name: t.def.Manifest.Name, + Description: t.def.Manifest.Description, + } +} + +func (t *pluginTool) Execute(ctx context.Context, args map[string]any) (string, error) { + client, err := t.supervisor.EnsureHealthy(ctx) + if err != nil { + return "", fmt.Errorf("plugin tool %s: %w", t.def.Manifest.Name, err) + } + + var resp pluginapi.ToolCallResponse + if err := client.Request(ctx, "call_tool", pluginapi.ToolCallRequest{ + Name: t.def.Manifest.Name, + Arguments: args, + }, &resp); err != nil { + return "", fmt.Errorf("plugin tool %s: %w", t.def.Manifest.Name, err) + } + if resp.Error != "" { + if resp.Output != "" { + return resp.Output, fmt.Errorf("plugin tool %s: %s", t.def.Manifest.Name, resp.Error) + } + return "", fmt.Errorf("plugin tool %s: %s", t.def.Manifest.Name, resp.Error) + } + return resp.Output, nil +} + +func (t *pluginTool) Close() error { + return t.supervisor.Close() +} diff --git a/internal/agent/tool/runtime_bindings_test.go b/internal/agent/tool/runtime_bindings_test.go new file mode 100644 index 00000000..fb3e5d1e --- /dev/null +++ b/internal/agent/tool/runtime_bindings_test.go @@ -0,0 +1,151 @@ +package tool_test + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "testing" + + agenttool "github.com/vaayne/anna/internal/agent/tool" + "github.com/vaayne/anna/internal/config" + "github.com/vaayne/anna/internal/pluginapi" +) + +func TestNewRegistryWithBindingsUsesRuntimePluginOverride(t *testing.T) { + home := t.TempDir() + t.Setenv("ANNA_HOME", home) + + pluginDir := filepath.Join(config.InstalledPluginsPath(), "replacement-read", "1.0.0") + if err := os.MkdirAll(pluginDir, 0o755); err != nil { + t.Fatal(err) + } + + entrypoint := filepath.Join(pluginDir, "helper.sh") + script := `#!/bin/sh +while IFS= read -r line; do + id=$(printf '%s\n' "$line" | sed -n 's/.*"id":"\([^"]*\)".*/\1/p') + method=$(printf '%s\n' "$line" | sed -n 's/.*"method":"\([^"]*\)".*/\1/p') + case "$method" in + handshake) + printf '{"id":"%s","type":"response","result":{"protocol_version":"anna-plugin/v1","name":"replacement-read","version":"1.0.0","kind":"tool","capabilities":["tool.call","health.check","shutdown.graceful"],"tool":{"name":"read","description":"replacement read","input_schema":{"type":"object"}}}}\n' "$id" + ;; + health) + printf '{"id":"%s","type":"response","result":{"ok":true}}\n' "$id" + ;; + call_tool) + printf '{"id":"%s","type":"response","result":{"output":"replacement read"}}\n' "$id" + ;; + shutdown) + printf '{"id":"%s","type":"response","result":{}}\n' "$id" + exit 0 + ;; + *) + printf '{"id":"%s","type":"response","error":{"code":"unknown_method","message":"%s"}}\n' "$id" "$method" + ;; + esac +done +` + if err := os.WriteFile(entrypoint, []byte(script), 0o755); err != nil { + t.Fatal(err) + } + + manifest := pluginapi.Manifest{ + Name: "replacement-read", + Version: "1.0.0", + Kind: pluginapi.KindTool, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: "helper.sh", + Tool: &pluginapi.ToolSpec{ + Name: "read", + Description: "replacement read", + InputSchema: map[string]any{"type": "object"}, + }, + Capabilities: []pluginapi.Capability{ + pluginapi.CapabilityToolCall, + pluginapi.CapabilityHealthCheck, + pluginapi.CapabilityGracefulShutdown, + }, + } + data, err := json.MarshalIndent(manifest, "", " ") + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(pluginDir, "plugin.json"), data, 0o644); err != nil { + t.Fatal(err) + } + + sandbox := t.TempDir() + allowedPath := filepath.Join(sandbox, "note.txt") + if err := os.WriteFile(allowedPath, []byte("ignored"), 0o644); err != nil { + t.Fatal(err) + } + + bindings := config.DefaultRuntimePluginBindings() + bindings.Tools["read"] = "tool/replacement-read" + + reg, err := agenttool.NewRegistryWithBindings(t.TempDir(), bindings, sandbox) + if err != nil { + t.Fatalf("NewRegistryWithBindings: %v", err) + } + defer func() { _ = reg.Close() }() + + got, err := reg.Execute(context.Background(), "read", map[string]any{"file_path": allowedPath}) + if err != nil { + t.Fatalf("Execute(read): %v", err) + } + if got != "replacement read" { + t.Fatalf("Execute(read) = %q, want replacement output", got) + } +} + +func TestNewRegistryWithBindingsRejectsWrongToolName(t *testing.T) { + home := t.TempDir() + t.Setenv("ANNA_HOME", home) + + pluginDir := filepath.Join(config.InstalledPluginsPath(), "bad-read", "1.0.0") + if err := os.MkdirAll(pluginDir, 0o755); err != nil { + t.Fatal(err) + } + + entrypoint := filepath.Join(pluginDir, "helper.sh") + if err := os.WriteFile(entrypoint, []byte("#!/bin/sh\nexit 0\n"), 0o755); err != nil { + t.Fatal(err) + } + + manifest := pluginapi.Manifest{ + Name: "bad-read", + Version: "1.0.0", + Kind: pluginapi.KindTool, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: "helper.sh", + Tool: &pluginapi.ToolSpec{ + Name: "other-tool", + Description: "wrong replacement", + InputSchema: map[string]any{"type": "object"}, + }, + Capabilities: []pluginapi.Capability{ + pluginapi.CapabilityToolCall, + pluginapi.CapabilityHealthCheck, + pluginapi.CapabilityGracefulShutdown, + }, + } + data, err := json.MarshalIndent(manifest, "", " ") + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(pluginDir, "plugin.json"), data, 0o644); err != nil { + t.Fatal(err) + } + + bindings := config.DefaultRuntimePluginBindings() + bindings.Tools["read"] = "tool/bad-read" + + _, err = agenttool.NewRegistryWithBindings(t.TempDir(), bindings, t.TempDir()) + if err == nil { + t.Fatal("expected binding error") + } + if got := err.Error(); got != `tool read bound to plugin tool/bad-read exposing tool "other-tool"` { + t.Fatalf("NewRegistryWithBindings error = %q", got) + } +} diff --git a/internal/agent/tool/sandbox.go b/internal/agent/tool/sandbox.go index 153df436..7e8ce858 100644 --- a/internal/agent/tool/sandbox.go +++ b/internal/agent/tool/sandbox.go @@ -31,6 +31,13 @@ func (s *sandboxTool) Execute(ctx context.Context, args map[string]any) (string, return s.inner.Execute(ctx, args) } +func (s *sandboxTool) Close() error { + if c, ok := s.inner.(closeableTool); ok { + return c.Close() + } + return nil +} + // wrapWithSandbox returns a sandbox-wrapped tool if allowedDir is non-empty. // Otherwise it returns the original tool unchanged. func wrapWithSandbox(t Tool, allowedDir, pathKey string) Tool { diff --git a/internal/agent/tool/tool.go b/internal/agent/tool/tool.go index 96911e09..d5958a0a 100644 --- a/internal/agent/tool/tool.go +++ b/internal/agent/tool/tool.go @@ -4,9 +4,12 @@ import ( "context" "fmt" "log/slog" + "os" "github.com/vaayne/anna/internal/config" "github.com/vaayne/anna/internal/embedded" + "github.com/vaayne/anna/internal/pluginapi" + "github.com/vaayne/anna/internal/pluginhost" "github.com/vaayne/anna/internal/toolspec" ) @@ -21,11 +24,43 @@ type Registry struct { tools map[string]Tool } +type closeableTool interface { + Close() error +} + // NewRegistry creates a registry with the default built-in tools. -// When userDataDir is non-empty, file tools (read, write, edit) are wrapped -// with sandbox validation that restricts paths to the user's data directory. -// The bash tool uses userDataDir as its working directory when set. +// Built-ins are exposed through subprocess plugin wrappers so the runner can +// exercise the same protocol path as later third-party plugins. func NewRegistry(workDir string, userDataDir ...string) *Registry { + reg, err := NewRegistryWithBindings(workDir, config.DefaultRuntimePluginBindings(), userDataDir...) + if err == nil && reg != nil { + return reg + } + slog.Warn("failed to configure runtime plugin bindings, falling back to builtin tool bindings", "error", err) + + var sandbox string + if len(userDataDir) > 0 { + sandbox = userDataDir[0] + } + fallback := &Registry{tools: make(map[string]Tool)} + for _, name := range BuiltinToolNames() { + def, _, toolErr := BuiltinToolPlugin(name, workDir, sandbox) + if toolErr != nil { + slog.Warn("failed to configure fallback builtin tool plugin", "tool", name, "error", toolErr) + continue + } + t := newPluginTool(def) + switch name { + case "read", "edit", "write": + fallback.Register(wrapWithSandbox(t, sandbox, "file_path")) + default: + fallback.Register(t) + } + } + return fallback +} + +func NewRegistryWithBindings(workDir string, bindings config.RuntimePluginBindings, userDataDir ...string) (*Registry, error) { if err := embedded.EnsureTools(config.AnnaHome()); err != nil { slog.Warn("failed to extract embedded tools", "error", err) } @@ -35,19 +70,35 @@ func NewRegistry(workDir string, userDataDir ...string) *Registry { sandbox = userDataDir[0] } - // Use user data dir as bash work dir when available. - bashDir := workDir - if sandbox != "" { - bashDir = sandbox + catalog, err := LoadCatalog(workDir, sandbox) + if err != nil { + return nil, err } r := &Registry{tools: make(map[string]Tool)} - r.Register(wrapWithSandbox(&ReadTool{}, sandbox, "file_path")) - r.Register(&BashTool{workDir: bashDir}) - r.Register(wrapWithSandbox(&EditTool{}, sandbox, "file_path")) - r.Register(wrapWithSandbox(&WriteTool{}, sandbox, "file_path")) - r.Register(NewWebFetchTool()) - return r + for _, name := range BuiltinToolNames() { + pluginID := bindings.ToolBinding(name) + def, ok := catalog.Get(pluginID) + if !ok { + slog.Warn("tool binding not found in runtime plugin catalog", "tool", name, "plugin", pluginID) + continue + } + if def.Manifest.Kind != pluginapi.KindTool { + return nil, fmt.Errorf("tool %s bound to non-tool plugin %s", name, pluginID) + } + if def.Manifest.Tool == nil || def.Manifest.Tool.Name != name { + return nil, fmt.Errorf("tool %s bound to plugin %s exposing tool %q", name, pluginID, toolName(def)) + } + + t := newPluginTool(def) + switch name { + case "read", "edit", "write": + r.Register(wrapWithSandbox(t, sandbox, "file_path")) + default: + r.Register(t) + } + } + return r, nil } // BuiltinNames returns the names of all currently registered tools. @@ -87,3 +138,41 @@ func (r *Registry) Execute(ctx context.Context, name string, args map[string]any } return t.Execute(ctx, args) } + +// Close shuts down any tools that expose a Close method. +func (r *Registry) Close() error { + var lastErr error + for _, t := range r.tools { + if c, ok := t.(closeableTool); ok { + if err := c.Close(); err != nil { + lastErr = err + } + } + } + return lastErr +} + +func LoadCatalog(workDir, userDataDir string) (*pluginhost.Catalog, error) { + roots := []string{} + for _, root := range []string{config.BundledPluginsPath(), config.InstalledPluginsPath()} { + if _, err := os.Stat(root); err == nil { + roots = append(roots, root) + } + } + + catalog, err := pluginhost.Discover(roots...) + if err != nil { + return nil, err + } + if err := catalog.Merge(BuiltinToolDefinitions(workDir, userDataDir)...); err != nil { + return nil, err + } + return catalog, nil +} + +func toolName(def pluginhost.Definition) string { + if def.Manifest.Tool == nil { + return "" + } + return def.Manifest.Tool.Name +} diff --git a/internal/agent/tool/tool_test.go b/internal/agent/tool/tool_test.go index 0a238ba2..09117e05 100644 --- a/internal/agent/tool/tool_test.go +++ b/internal/agent/tool/tool_test.go @@ -11,8 +11,26 @@ import ( "testing" readability "codeberg.org/readeck/go-readability/v2" + "github.com/vaayne/anna/internal/toolspec" ) +type closeRecorderTool struct { + closed bool +} + +func (t *closeRecorderTool) Definition() toolspec.Definition { + return toolspec.Definition{Name: "close-recorder"} +} + +func (t *closeRecorderTool) Execute(context.Context, map[string]any) (string, error) { + return "", nil +} + +func (t *closeRecorderTool) Close() error { + t.closed = true + return nil +} + func TestRegistryDefinitions(t *testing.T) { reg := NewRegistry("") defs := reg.Definitions() @@ -39,6 +57,22 @@ func TestRegistryExecuteUnknown(t *testing.T) { } } +func TestSandboxToolCloseDelegates(t *testing.T) { + inner := &closeRecorderTool{} + wrapped := wrapWithSandbox(inner, t.TempDir(), "file_path") + + closer, ok := wrapped.(closeableTool) + if !ok { + t.Fatal("wrapped sandbox tool should expose Close") + } + if err := closer.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + if !inner.closed { + t.Fatal("expected sandbox wrapper to delegate Close to inner tool") + } +} + func TestReadTool(t *testing.T) { dir := t.TempDir() path := filepath.Join(dir, "test.txt") diff --git a/internal/auth/linkcode.go b/internal/auth/linkcode.go index 8b50815e..ed003d99 100644 --- a/internal/auth/linkcode.go +++ b/internal/auth/linkcode.go @@ -1,8 +1,12 @@ package auth import ( + "context" "crypto/rand" + "database/sql" "encoding/hex" + "fmt" + "log/slog" "strings" "sync" "time" @@ -23,11 +27,12 @@ type linkCodeEntry struct { ExpireAt time.Time } -// LinkCodeStore manages in-memory link codes for channel account linking. -// Codes are single-use and expire after 5 minutes. Not persisted to DB; -// restart clears all pending codes (acceptable for MVP). +// LinkCodeStore manages single-use link codes for channel account linking. +// Codes expire after 5 minutes. The default constructor keeps them in-memory; +// the shared constructor persists them to the Anna DB for cross-process use. type LinkCodeStore struct { codes sync.Map // string -> linkCodeEntry + db *sql.DB } // NewLinkCodeStore creates a new link code store. @@ -35,10 +40,32 @@ func NewLinkCodeStore() *LinkCodeStore { return &LinkCodeStore{} } +// NewSharedLinkCodeStore creates a link code store backed by the shared Anna DB +// so admin and channel subprocesses can exchange codes across processes. +func NewSharedLinkCodeStore(ctx context.Context, db *sql.DB) (*LinkCodeStore, error) { + if db == nil { + return nil, fmt.Errorf("link code store: nil db") + } + + store := &LinkCodeStore{db: db} + if err := store.ensureSchema(ctx); err != nil { + return nil, err + } + return store, nil +} + // Generate creates a new 6-character alphanumeric link code for the given // user and platform. Returns the code string. func (s *LinkCodeStore) Generate(userID int64, platform string) string { code := randomAlphanumeric(linkCodeLength) + if s.db != nil { + if err := s.generateShared(context.Background(), code, userID, platform); err != nil { + slog.Error("link code: persist generate failed", "platform", platform, "user_id", userID, "error", err) + return code + } + return code + } + s.codes.Store(code, linkCodeEntry{ UserID: userID, Platform: platform, @@ -52,6 +79,10 @@ func (s *LinkCodeStore) Generate(userID int64, platform string) string { // Returns (0, "", false) if the code is invalid or expired. func (s *LinkCodeStore) Consume(code string) (int64, string, bool) { code = strings.ToUpper(strings.TrimSpace(code)) + if s.db != nil { + return s.consumeShared(context.Background(), code) + } + val, ok := s.codes.LoadAndDelete(code) if !ok { return 0, "", false @@ -94,3 +125,68 @@ func randomAlphanumeric(n int) string { func isAlphanumeric(c rune) bool { return (c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') } + +func (s *LinkCodeStore) ensureSchema(ctx context.Context) error { + const stmt = `CREATE TABLE IF NOT EXISTS auth_link_codes ( + code TEXT PRIMARY KEY, + user_id INTEGER NOT NULL, + platform TEXT NOT NULL, + expire_at INTEGER NOT NULL + )` + if _, err := s.db.ExecContext(ctx, stmt); err != nil { + return fmt.Errorf("link code store: ensure schema: %w", err) + } + return nil +} + +func (s *LinkCodeStore) generateShared(ctx context.Context, code string, userID int64, platform string) error { + if err := s.deleteExpired(ctx); err != nil { + return err + } + + const stmt = `INSERT OR REPLACE INTO auth_link_codes (code, user_id, platform, expire_at) + VALUES (?, ?, ?, ?)` + _, err := s.db.ExecContext(ctx, stmt, code, userID, platform, time.Now().Add(linkCodeTTL).Unix()) + if err != nil { + return fmt.Errorf("link code store: insert code: %w", err) + } + return nil +} + +func (s *LinkCodeStore) consumeShared(ctx context.Context, code string) (int64, string, bool) { + if err := s.deleteExpired(ctx); err != nil { + slog.Error("link code: purge expired failed", "error", err) + return 0, "", false + } + + const stmt = `DELETE FROM auth_link_codes + WHERE code = ? + RETURNING user_id, platform, expire_at` + + var ( + userID int64 + platform string + expireAt int64 + ) + err := s.db.QueryRowContext(ctx, stmt, code).Scan(&userID, &platform, &expireAt) + if err == sql.ErrNoRows { + return 0, "", false + } + if err != nil { + slog.Error("link code: consume failed", "code", code, "error", err) + return 0, "", false + } + if time.Now().After(time.Unix(expireAt, 0)) { + return 0, "", false + } + return userID, platform, true +} + +func (s *LinkCodeStore) deleteExpired(ctx context.Context) error { + const stmt = `DELETE FROM auth_link_codes WHERE expire_at <= ?` + _, err := s.db.ExecContext(ctx, stmt, time.Now().Unix()) + if err != nil { + return fmt.Errorf("link code store: delete expired: %w", err) + } + return nil +} diff --git a/internal/auth/linkcode_test.go b/internal/auth/linkcode_test.go index e47a91b4..dae096ba 100644 --- a/internal/auth/linkcode_test.go +++ b/internal/auth/linkcode_test.go @@ -1,10 +1,13 @@ package auth_test import ( + "context" + "path/filepath" "strings" "testing" "github.com/vaayne/anna/internal/auth" + appdb "github.com/vaayne/anna/internal/db" ) func TestLinkCodeGenerate(t *testing.T) { @@ -46,6 +49,41 @@ func TestLinkCodeConsume(t *testing.T) { } } +func TestSharedLinkCodeConsumeAcrossStores(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "linkcodes.db") + db, err := appdb.OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + db2, err := appdb.OpenDB(dbPath) + if err != nil { + t.Fatalf("OpenDB second handle: %v", err) + } + t.Cleanup(func() { _ = db2.Close() }) + + issuer, err := auth.NewSharedLinkCodeStore(context.Background(), db) + if err != nil { + t.Fatalf("NewSharedLinkCodeStore issuer: %v", err) + } + consumer, err := auth.NewSharedLinkCodeStore(context.Background(), db2) + if err != nil { + t.Fatalf("NewSharedLinkCodeStore consumer: %v", err) + } + + code := issuer.Generate(42, "telegram") + userID, platform, ok := consumer.Consume(code) + if !ok { + t.Fatal("expected Consume to succeed across store instances") + } + if userID != 42 { + t.Errorf("userID = %d, want 42", userID) + } + if platform != "telegram" { + t.Errorf("platform = %q, want telegram", platform) + } +} + func TestLinkCodeConsumeCaseInsensitive(t *testing.T) { store := auth.NewLinkCodeStore() diff --git a/internal/channel/config.go b/internal/channel/config.go new file mode 100644 index 00000000..8696513b --- /dev/null +++ b/internal/channel/config.go @@ -0,0 +1,64 @@ +package channel + +import ( + "context" + "encoding/json" + "log/slog" + + "github.com/vaayne/anna/internal/config" +) + +type TelegramConfig struct { + Token string `json:"token"` + ChannelID string `json:"channel_id"` + GroupMode string `json:"group_mode"` + EnableNotify bool `json:"enable_notify"` +} + +type QQConfig struct { + AppID string `json:"app_id"` + AppSecret string `json:"app_secret"` + GroupMode string `json:"group_mode"` + EnableNotify bool `json:"enable_notify"` +} + +type FeishuConfig struct { + AppID string `json:"app_id"` + AppSecret string `json:"app_secret"` + EncryptKey string `json:"encrypt_key"` + VerificationToken string `json:"verification_token"` + GroupMode string `json:"group_mode"` + Groups map[string]FeishuGroup `json:"groups"` + EnableNotify bool `json:"enable_notify"` +} + +type FeishuGroup struct { + GroupMode string `json:"group_mode"` + SystemPrompt string `json:"system_prompt"` + ToolAllow []string `json:"tool_allow"` + ToolDeny []string `json:"tool_deny"` +} + +type WeixinConfig struct { + BotToken string `json:"bot_token"` + BaseURL string `json:"base_url"` + BotID string `json:"bot_id"` + UserID string `json:"user_id"` + EnableNotify bool `json:"enable_notify"` +} + +// LoadConfig loads a channel's JSON config from the store and deserializes it +// into the given type. Returns nil if the channel is missing or the payload +// cannot be decoded. +func LoadConfig[T any](store config.Store, channelID string) *T { + ch, err := store.GetChannel(context.Background(), channelID) + if err != nil { + return nil + } + var cfg T + if err := json.Unmarshal([]byte(ch.Config), &cfg); err != nil { + slog.Warn("failed to parse channel config", "channel", channelID, "error", err) + return nil + } + return &cfg +} diff --git a/internal/channel/plugin_runtime.go b/internal/channel/plugin_runtime.go new file mode 100644 index 00000000..4de9e604 --- /dev/null +++ b/internal/channel/plugin_runtime.go @@ -0,0 +1,91 @@ +package channel + +import ( + "fmt" + "path/filepath" + + "github.com/vaayne/anna/internal/pluginapi" + "github.com/vaayne/anna/internal/pluginhost" +) + +const builtinChannelPluginVersion = "1.0.0" + +var builtinChannelNames = []string{"telegram", "qq", "feishu", "weixin"} + +func BuiltinChannelNames() []string { + names := make([]string, len(builtinChannelNames)) + copy(names, builtinChannelNames) + return names +} + +func BuiltinChannelDefinitions(workDir, userDataDir string) []pluginhost.Definition { + defs := make([]pluginhost.Definition, 0, len(builtinChannelNames)) + for _, name := range builtinChannelNames { + def, err := BuiltinChannelDefinition(name, workDir, userDataDir) + if err != nil { + continue + } + defs = append(defs, def) + } + return defs +} + +func BuiltinChannelDefinition(name, workDir, userDataDir string) (pluginhost.Definition, error) { + var description string + switch name { + case "telegram": + description = "Telegram channel plugin" + case "qq": + description = "QQ channel plugin" + case "feishu": + description = "Feishu channel plugin" + case "weixin": + description = "Weixin channel plugin" + default: + return pluginhost.Definition{}, fmt.Errorf("unknown builtin channel plugin: %s", name) + } + + manifest := pluginapi.Manifest{ + Name: name, + Version: builtinChannelPluginVersion, + Kind: pluginapi.KindChannel, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: pluginhost.BuiltinEntrypoint, + Args: []string{ + "channel", + name, + }, + Description: description, + Capabilities: []pluginapi.Capability{ + pluginapi.CapabilityChannelStart, + pluginapi.CapabilityChannelStop, + pluginapi.CapabilityChannelNotify, + pluginapi.CapabilityChannelInbound, + pluginapi.CapabilityHealthCheck, + pluginapi.CapabilityGracefulShutdown, + }, + Metadata: map[string]any{ + "work_dir": workDir, + "user_data_dir": userDataDir, + }, + } + if workDir != "" { + manifest.Args = append(manifest.Args, "--work-dir", workDir) + } + if userDataDir != "" { + manifest.Args = append(manifest.Args, "--user-data-dir", userDataDir) + } + + rootDir := workDir + if rootDir == "" { + rootDir = userDataDir + } + if rootDir == "" { + rootDir = "." + } + + return pluginhost.Definition{ + Manifest: manifest, + RootDir: filepath.Clean(rootDir), + }, nil +} diff --git a/internal/config/dbstore.go b/internal/config/dbstore.go index 1b488ab6..201f447e 100644 --- a/internal/config/dbstore.go +++ b/internal/config/dbstore.go @@ -327,16 +327,17 @@ func (s *DBStore) Snapshot(ctx context.Context, agentID string) (*Snapshot, erro defaultCreds := providers[defaultProvID] snap := &Snapshot{ - AgentID: agentID, - Provider: defaultProvID, - Model: ag.Model, - ModelStrong: ag.ModelStrong, - ModelFast: ag.ModelFast, - Workspace: ag.Workspace, - APIKey: defaultCreds.APIKey, - BaseURL: defaultCreds.BaseURL, - SystemPrompt: ag.SystemPrompt, - Providers: providers, + AgentID: agentID, + Provider: defaultProvID, + Model: ag.Model, + ModelStrong: ag.ModelStrong, + ModelFast: ag.ModelFast, + Workspace: ag.Workspace, + APIKey: defaultCreds.APIKey, + BaseURL: defaultCreds.BaseURL, + SystemPrompt: ag.SystemPrompt, + Providers: providers, + RuntimePlugins: DefaultRuntimePluginBindings(), } // Load settings. @@ -355,6 +356,9 @@ func (s *DBStore) Snapshot(ctx context.Context, agentID string) (*Snapshot, erro if val, err := s.GetSetting(ctx, "plugins"); err == nil && val != "" { _ = json.Unmarshal([]byte(val), &snap.Plugins) } + if val, err := s.GetSetting(ctx, runtimePluginsSettingKey); err == nil && val != "" { + _ = json.Unmarshal([]byte(val), &snap.RuntimePlugins) + } // Apply defaults. if snap.Runner.Type == "" { diff --git a/internal/config/dbstore_test.go b/internal/config/dbstore_test.go index 4353b378..ea09f17b 100644 --- a/internal/config/dbstore_test.go +++ b/internal/config/dbstore_test.go @@ -366,6 +366,8 @@ func TestSnapshot(t *testing.T) { _ = store.SetSetting(ctx, "runner", `{"type":"go","idle_timeout":30}`) _ = store.SetSetting(ctx, "compaction", `{"enabled":true}`) + _ = store.SetSetting(ctx, "plugins", `[{"path":"/tmp/example.js","config":{"mode":"test"}}]`) + _ = store.SetSetting(ctx, "runtime_plugins", `{"tools":{"read":"tool/custom-read"},"channels":{"telegram":"channel/custom-telegram"}}`) snap, err := store.Snapshot(ctx, "anna") if err != nil { @@ -393,6 +395,15 @@ func TestSnapshot(t *testing.T) { if snap.Runner.IdleTimeout != 30 { t.Errorf("Runner.IdleTimeout = %d", snap.Runner.IdleTimeout) } + if len(snap.Plugins) != 1 || snap.Plugins[0].Path != "/tmp/example.js" { + t.Errorf("Plugins = %+v", snap.Plugins) + } + if got := snap.RuntimePlugins.ToolBinding("read"); got != "tool/custom-read" { + t.Errorf("RuntimePlugins.ToolBinding(read) = %q", got) + } + if got := snap.RuntimePlugins.ChannelBinding("telegram"); got != "channel/custom-telegram" { + t.Errorf("RuntimePlugins.ChannelBinding(telegram) = %q", got) + } } func TestSnapshotDefaults(t *testing.T) { diff --git a/internal/config/plugins.go b/internal/config/plugins.go new file mode 100644 index 00000000..045017e6 --- /dev/null +++ b/internal/config/plugins.go @@ -0,0 +1,15 @@ +package config + +import "path/filepath" + +func PluginsPath() string { + return filepath.Join(AnnaHome(), "plugins") +} + +func BundledPluginsPath() string { + return filepath.Join(PluginsPath(), "bundled") +} + +func InstalledPluginsPath() string { + return filepath.Join(PluginsPath(), "installed") +} diff --git a/internal/config/runtime_plugins.go b/internal/config/runtime_plugins.go new file mode 100644 index 00000000..67244ac0 --- /dev/null +++ b/internal/config/runtime_plugins.go @@ -0,0 +1,106 @@ +package config + +import ( + "context" + "encoding/json" + "fmt" +) + +const runtimePluginsSettingKey = "runtime_plugins" + +var defaultRuntimeToolBindings = map[string]string{ + "read": "tool/read", + "bash": "tool/bash", + "edit": "tool/edit", + "write": "tool/write", + "webfetch": "tool/webfetch", +} + +var defaultRuntimeChannelBindings = map[string]string{ + "telegram": "channel/telegram", + "qq": "channel/qq", + "feishu": "channel/feishu", + "weixin": "channel/weixin", +} + +// RuntimePluginBindings configures which subprocess plugin handles each +// built-in tool or channel slot. The separate setting keeps the legacy JS +// plugin list untouched while runtime plugin bindings evolve independently. +type RuntimePluginBindings struct { + Tools map[string]string `json:"tools,omitempty"` + Channels map[string]string `json:"channels,omitempty"` +} + +func DefaultRuntimePluginBindings() RuntimePluginBindings { + return RuntimePluginBindings{ + Tools: copyStringMap(defaultRuntimeToolBindings), + Channels: copyStringMap(defaultRuntimeChannelBindings), + } +} + +func (b RuntimePluginBindings) ToolBinding(name string) string { + if id := b.Tools[name]; id != "" { + return id + } + return defaultRuntimeToolBindings[name] +} + +func (b RuntimePluginBindings) ChannelBinding(name string) string { + if id := b.Channels[name]; id != "" { + return id + } + return defaultRuntimeChannelBindings[name] +} + +func (b RuntimePluginBindings) EffectiveTools() map[string]string { + out := copyStringMap(defaultRuntimeToolBindings) + for name, id := range b.Tools { + if id != "" { + out[name] = id + } + } + return out +} + +func (b RuntimePluginBindings) EffectiveChannels() map[string]string { + out := copyStringMap(defaultRuntimeChannelBindings) + for name, id := range b.Channels { + if id != "" { + out[name] = id + } + } + return out +} + +func LoadRuntimePluginBindings(store Store) (RuntimePluginBindings, error) { + if store == nil { + return DefaultRuntimePluginBindings(), nil + } + + val, err := store.GetSetting(context.Background(), runtimePluginsSettingKey) + if err != nil || val == "" { + return DefaultRuntimePluginBindings(), nil + } + + bindings := DefaultRuntimePluginBindings() + if err := json.Unmarshal([]byte(val), &bindings); err != nil { + return RuntimePluginBindings{}, fmt.Errorf("parse %s: %w", runtimePluginsSettingKey, err) + } + return bindings, nil +} + +func SaveRuntimePluginBindings(store Store, bindings RuntimePluginBindings) error { + data, err := json.Marshal(bindings) + if err != nil { + return fmt.Errorf("marshal %s: %w", runtimePluginsSettingKey, err) + } + return store.SetSetting(context.Background(), runtimePluginsSettingKey, string(data)) +} + +func copyStringMap(src map[string]string) map[string]string { + out := make(map[string]string, len(src)) + for k, v := range src { + out[k] = v + } + return out +} diff --git a/internal/config/runtime_plugins_test.go b/internal/config/runtime_plugins_test.go new file mode 100644 index 00000000..18c3b9ae --- /dev/null +++ b/internal/config/runtime_plugins_test.go @@ -0,0 +1,46 @@ +package config + +import "testing" + +func TestLoadRuntimePluginBindingsDefaults(t *testing.T) { + store := setupDBStore(t) + + bindings, err := LoadRuntimePluginBindings(store) + if err != nil { + t.Fatalf("LoadRuntimePluginBindings: %v", err) + } + + if got := bindings.ToolBinding("read"); got != "tool/read" { + t.Fatalf("ToolBinding(read) = %q, want tool/read", got) + } + if got := bindings.ChannelBinding("telegram"); got != "channel/telegram" { + t.Fatalf("ChannelBinding(telegram) = %q, want channel/telegram", got) + } +} + +func TestSaveRuntimePluginBindingsRoundTrip(t *testing.T) { + store := setupDBStore(t) + + bindings := DefaultRuntimePluginBindings() + bindings.Tools["read"] = "tool/custom-read" + bindings.Channels["telegram"] = "channel/custom-telegram" + + if err := SaveRuntimePluginBindings(store, bindings); err != nil { + t.Fatalf("SaveRuntimePluginBindings: %v", err) + } + + got, err := LoadRuntimePluginBindings(store) + if err != nil { + t.Fatalf("LoadRuntimePluginBindings: %v", err) + } + + if id := got.ToolBinding("read"); id != "tool/custom-read" { + t.Fatalf("ToolBinding(read) = %q, want tool/custom-read", id) + } + if id := got.ChannelBinding("telegram"); id != "channel/custom-telegram" { + t.Fatalf("ChannelBinding(telegram) = %q, want channel/custom-telegram", id) + } + if id := got.ToolBinding("bash"); id != "tool/bash" { + t.Fatalf("ToolBinding(bash) = %q, want default tool/bash", id) + } +} diff --git a/internal/config/snapshot.go b/internal/config/snapshot.go index 2c985eca..b54b41bb 100644 --- a/internal/config/snapshot.go +++ b/internal/config/snapshot.go @@ -21,19 +21,20 @@ type Snapshot struct { // Provider, APIKey, BaseURL are the default provider credentials derived // from the Model field's provider prefix. Kept for backward compatibility. - Provider string - Model string - ModelStrong string - ModelFast string - Workspace string - APIKey string - BaseURL string - SystemPrompt string // agent's soul/personality from DB - Runner RunnerConfig - Compaction CompactionConfig - Heartbeat HeartbeatConfig - Scheduler SchedulerConfig - Plugins []PluginConfig + Provider string + Model string + ModelStrong string + ModelFast string + Workspace string + APIKey string + BaseURL string + SystemPrompt string // agent's soul/personality from DB + Runner RunnerConfig + Compaction CompactionConfig + Heartbeat HeartbeatConfig + Scheduler SchedulerConfig + Plugins []PluginConfig + RuntimePlugins RuntimePluginBindings // Providers maps provider ID to credentials, enabling per-tier provider // resolution when model_strong or model_fast use a different provider. diff --git a/internal/pluginapi/types.go b/internal/pluginapi/types.go new file mode 100644 index 00000000..86e8082f --- /dev/null +++ b/internal/pluginapi/types.go @@ -0,0 +1,155 @@ +package pluginapi + +import "encoding/json" + +const ProtocolVersion = "anna-plugin/v1" + +type PluginKind string + +const ( + KindTool PluginKind = "tool" + KindChannel PluginKind = "channel" +) + +type Capability string + +const ( + CapabilityToolCall Capability = "tool.call" + CapabilityChannelStart Capability = "channel.start" + CapabilityChannelStop Capability = "channel.stop" + CapabilityChannelNotify Capability = "channel.notify" + CapabilityChannelInbound Capability = "channel.inbound" + CapabilityHealthCheck Capability = "health.check" + CapabilityGracefulShutdown Capability = "shutdown.graceful" +) + +type Manifest struct { + Name string `json:"name"` + Version string `json:"version"` + Kind PluginKind `json:"kind"` + ProtocolVersion string `json:"protocol_version"` + Entrypoint string `json:"entrypoint"` + Args []string `json:"args,omitempty"` + Description string `json:"description,omitempty"` + Tool *ToolSpec `json:"tool,omitempty"` + Capabilities []Capability `json:"capabilities,omitempty"` + ConfigSchema map[string]any `json:"config_schema,omitempty"` + Permissions map[string]any `json:"permissions,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` +} + +type ToolSpec struct { + Name string `json:"name"` + Description string `json:"description"` + InputSchema map[string]any `json:"input_schema,omitempty"` +} + +type MessageType string + +const ( + MessageTypeRequest MessageType = "request" + MessageTypeResponse MessageType = "response" + MessageTypeEvent MessageType = "event" +) + +type Envelope struct { + ID string `json:"id,omitempty"` + Type MessageType `json:"type"` + Method string `json:"method,omitempty"` + Event string `json:"event,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *RPCError `json:"error,omitempty"` +} + +type RPCError struct { + Code string `json:"code"` + Message string `json:"message"` +} + +func (e *RPCError) Error() string { + if e == nil { + return "" + } + if e.Code == "" { + return e.Message + } + if e.Message == "" { + return e.Code + } + return e.Code + ": " + e.Message +} + +type HandshakeRequest struct { + ProtocolVersion string `json:"protocol_version"` +} + +type HandshakeResponse struct { + ProtocolVersion string `json:"protocol_version"` + Name string `json:"name"` + Version string `json:"version"` + Kind PluginKind `json:"kind"` + Capabilities []Capability `json:"capabilities,omitempty"` + Tool *ToolSpec `json:"tool,omitempty"` +} + +type HealthResponse struct { + OK bool `json:"ok"` +} + +type ToolCallRequest struct { + Name string `json:"name"` + Arguments map[string]any `json:"arguments"` +} + +type ToolCallResponse struct { + Output string `json:"output,omitempty"` + Error string `json:"error,omitempty"` +} + +type ChannelNotification struct { + Channel string `json:"channel,omitempty"` + ChatID string `json:"chat_id,omitempty"` + Text string `json:"text,omitempty"` + Silent bool `json:"silent,omitempty"` +} + +type ChannelStartRequest struct { + Config json.RawMessage `json:"config,omitempty"` +} + +type ChannelStartResponse struct { + Started bool `json:"started"` +} + +type ChannelStopRequest struct{} + +type ChannelStopResponse struct { + Stopped bool `json:"stopped"` +} + +type ChannelNotifyRequest struct { + Notification ChannelNotification `json:"notification"` +} + +type ChannelNotifyResponse struct { + Delivered bool `json:"delivered"` + Error string `json:"error,omitempty"` +} + +type ChannelInboundMessage struct { + Platform string `json:"platform"` + ChatID string `json:"chat_id,omitempty"` + SenderID string `json:"sender_id,omitempty"` + SenderName string `json:"sender_name,omitempty"` + MessageID string `json:"message_id,omitempty"` + IsGroup bool `json:"is_group,omitempty"` + Text string `json:"text,omitempty"` + Payload map[string]any `json:"payload,omitempty"` +} + +type ChannelEvent struct { + Kind string `json:"kind"` + Message *ChannelInboundMessage `json:"message,omitempty"` + Error *RPCError `json:"error,omitempty"` +} diff --git a/internal/pluginhost/catalog.go b/internal/pluginhost/catalog.go new file mode 100644 index 00000000..dc2fb746 --- /dev/null +++ b/internal/pluginhost/catalog.go @@ -0,0 +1,97 @@ +package pluginhost + +import ( + "fmt" + "io/fs" + "path/filepath" + "sort" +) + +type Catalog struct { + defs map[string]Definition +} + +func NewCatalog() *Catalog { + return &Catalog{defs: make(map[string]Definition)} +} + +func Discover(roots ...string) (*Catalog, error) { + catalog := NewCatalog() + + for _, root := range roots { + if root == "" { + continue + } + if err := walkRoot(root, catalog.defs); err != nil { + return nil, err + } + } + + return catalog, nil +} + +func (c *Catalog) Add(def Definition) error { + if c == nil { + return fmt.Errorf("catalog is nil") + } + if existing, ok := c.defs[def.ID()]; ok { + return fmt.Errorf("duplicate plugin definition %q: %s and %s", def.ID(), existing.ManifestPath, def.ManifestPath) + } + c.defs[def.ID()] = def + return nil +} + +func (c *Catalog) Merge(defs ...Definition) error { + for _, def := range defs { + if err := c.Add(def); err != nil { + return err + } + } + return nil +} + +func walkRoot(root string, defs map[string]Definition) error { + return filepath.WalkDir(root, func(path string, entry fs.DirEntry, err error) error { + if err != nil { + return err + } + if entry.IsDir() || entry.Name() != ManifestFilename { + return nil + } + + def, err := LoadDefinition(path) + if err != nil { + return err + } + if existing, ok := defs[def.ID()]; ok { + return fmt.Errorf("duplicate plugin definition %q: %s and %s", def.ID(), existing.ManifestPath, path) + } + defs[def.ID()] = def + return nil + }) +} + +func (c *Catalog) Get(id string) (Definition, bool) { + if c == nil { + return Definition{}, false + } + def, ok := c.defs[id] + return def, ok +} + +func (c *Catalog) List() []Definition { + if c == nil { + return nil + } + out := make([]Definition, 0, len(c.defs)) + for _, def := range c.defs { + out = append(out, def) + } + sort.Slice(out, func(i, j int) bool { + if out[i].Manifest.Kind == out[j].Manifest.Kind { + return out[i].Manifest.Name < out[j].Manifest.Name + } + return out[i].Manifest.Kind < out[j].Manifest.Kind + }) + return out +} diff --git a/internal/pluginhost/catalog_test.go b/internal/pluginhost/catalog_test.go new file mode 100644 index 00000000..ab48ebe3 --- /dev/null +++ b/internal/pluginhost/catalog_test.go @@ -0,0 +1,79 @@ +package pluginhost + +import ( + "os" + "path/filepath" + "testing" + + "github.com/vaayne/anna/internal/pluginapi" +) + +func TestDiscover(t *testing.T) { + root := t.TempDir() + writeManifest(t, filepath.Join(root, "tool", "plugin.json"), pluginapi.Manifest{ + Name: "read", + Version: "1.0.0", + Kind: pluginapi.KindTool, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: "plugin.sh", + Tool: &pluginapi.ToolSpec{ + Name: "read", + Description: "read", + InputSchema: map[string]any{}, + }, + }) + if err := os.WriteFile(filepath.Join(root, "tool", "plugin.sh"), []byte("#!/bin/sh\n"), 0o755); err != nil { + t.Fatal(err) + } + + catalog, err := Discover(root) + if err != nil { + t.Fatalf("Discover() error = %v", err) + } + if got := len(catalog.List()); got != 1 { + t.Fatalf("len(List()) = %d, want 1", got) + } + if _, ok := catalog.Get("tool/read"); !ok { + t.Fatalf("expected tool/read in catalog") + } +} + +func TestDiscoverDuplicate(t *testing.T) { + root := t.TempDir() + + writeManifest(t, filepath.Join(root, "a", "plugin.json"), pluginapi.Manifest{ + Name: "read", + Version: "1.0.0", + Kind: pluginapi.KindTool, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: "plugin.sh", + Tool: &pluginapi.ToolSpec{ + Name: "read", + Description: "read", + InputSchema: map[string]any{}, + }, + }) + writeManifest(t, filepath.Join(root, "b", "plugin.json"), pluginapi.Manifest{ + Name: "read", + Version: "1.0.1", + Kind: pluginapi.KindTool, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: "plugin.sh", + Tool: &pluginapi.ToolSpec{ + Name: "read", + Description: "read", + InputSchema: map[string]any{}, + }, + }) + + if err := os.WriteFile(filepath.Join(root, "a", "plugin.sh"), []byte("#!/bin/sh\n"), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(root, "b", "plugin.sh"), []byte("#!/bin/sh\n"), 0o755); err != nil { + t.Fatal(err) + } + + if _, err := Discover(root); err == nil { + t.Fatal("Discover() error = nil, want duplicate error") + } +} diff --git a/internal/pluginhost/channeladapter.go b/internal/pluginhost/channeladapter.go new file mode 100644 index 00000000..d47c7244 --- /dev/null +++ b/internal/pluginhost/channeladapter.go @@ -0,0 +1,103 @@ +package pluginhost + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/vaayne/anna/internal/pluginapi" +) + +// ChannelAdapter exposes a channel plugin as a restartable host-side runner. +type ChannelAdapter struct { + def Definition + logger *slog.Logger + + supervisor *Supervisor + mu sync.Mutex + stopped bool +} + +func NewChannelAdapter(def Definition, opts SupervisorOptions) *ChannelAdapter { + logger := opts.Logger + if logger == nil { + logger = slog.Default() + } + return &ChannelAdapter{ + def: def, + logger: logger.With("plugin", def.ID()), + supervisor: NewSupervisor(def, opts), + } +} + +func (a *ChannelAdapter) Name() string { return a.def.Manifest.Name } + +func (a *ChannelAdapter) Start(ctx context.Context) error { + client, err := a.supervisor.Start(ctx) + if err != nil { + return err + } + waitCh := make(chan error, 1) + go func(c *Client) { waitCh <- c.Wait() }(client) + + for { + select { + case <-ctx.Done(): + a.markStopped() + _ = a.supervisor.Close() + return ctx.Err() + case err := <-waitCh: + if ctx.Err() != nil || a.isStopped() { + return ctx.Err() + } + if err != nil { + a.logger.Warn("channel plugin exited, restarting", "error", err) + } else { + a.logger.Warn("channel plugin exited, restarting") + } + client, err = a.supervisor.Restart(ctx) + if err != nil { + return fmt.Errorf("restart channel plugin: %w", err) + } + waitCh = make(chan error, 1) + go func(c *Client) { waitCh <- c.Wait() }(client) + } + } +} + +func (a *ChannelAdapter) Stop() { + a.markStopped() + _ = a.supervisor.Close() +} + +func (a *ChannelAdapter) Notify(ctx context.Context, n pluginapi.ChannelNotification) error { + client, err := a.supervisor.EnsureHealthy(ctx) + if err != nil { + return err + } + + var resp pluginapi.ChannelNotifyResponse + if err := client.Request(ctx, "notify", pluginapi.ChannelNotifyRequest{Notification: n}, &resp); err != nil { + return err + } + if !resp.Delivered { + if resp.Error != "" { + return fmt.Errorf("%s", resp.Error) + } + return fmt.Errorf("channel notification not delivered") + } + return nil +} + +func (a *ChannelAdapter) markStopped() { + a.mu.Lock() + a.stopped = true + a.mu.Unlock() +} + +func (a *ChannelAdapter) isStopped() bool { + a.mu.Lock() + defer a.mu.Unlock() + return a.stopped +} diff --git a/internal/pluginhost/channeladapter_test.go b/internal/pluginhost/channeladapter_test.go new file mode 100644 index 00000000..a8c40c84 --- /dev/null +++ b/internal/pluginhost/channeladapter_test.go @@ -0,0 +1,139 @@ +package pluginhost + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/vaayne/anna/internal/pluginapi" +) + +func TestChannelAdapterRestartsAfterCrash(t *testing.T) { + def := testChannelDefinitionWithEnv(t, map[string]string{ + "ANNA_PLUGIN_HELPER_CHANNEL_EXIT_ON_NOTIFY": "1", + }) + + adapter := NewChannelAdapter(def, SupervisorOptions{RestartDelay: time.Millisecond}) + defer func() { _ = adapter.supervisor.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := adapter.Notify(ctx, pluginapi.ChannelNotification{Text: "first"}); err != nil { + t.Fatalf("Notify() first error = %v", err) + } + + time.Sleep(50 * time.Millisecond) + + if err := adapter.Notify(ctx, pluginapi.ChannelNotification{Text: "second"}); err != nil { + t.Fatalf("Notify() second error = %v", err) + } + + if got := adapter.supervisor.RestartCount(); got < 1 { + t.Fatalf("RestartCount() = %d, want >= 1", got) + } +} + +func TestHelperChannelPluginProcess(t *testing.T) { + if os.Getenv("ANNA_PLUGIN_HELPER_CHANNEL_PROCESS") != "1" { + return + } + + decoder := json.NewDecoder(os.Stdin) + encoder := json.NewEncoder(os.Stdout) + for { + var env pluginapi.Envelope + if err := decoder.Decode(&env); err != nil { + return + } + + switch env.Method { + case "handshake": + writeTestChannelResponse(t, encoder, env.ID, pluginapi.HandshakeResponse{ + ProtocolVersion: pluginapi.ProtocolVersion, + Name: "helper-channel", + Version: "1.0.0", + Kind: pluginapi.KindChannel, + Capabilities: []pluginapi.Capability{ + pluginapi.CapabilityChannelStart, + pluginapi.CapabilityChannelStop, + pluginapi.CapabilityChannelNotify, + pluginapi.CapabilityChannelInbound, + pluginapi.CapabilityHealthCheck, + pluginapi.CapabilityGracefulShutdown, + }, + }) + case "health": + writeTestChannelResponse(t, encoder, env.ID, pluginapi.HealthResponse{OK: true}) + case "notify": + var req pluginapi.ChannelNotifyRequest + if err := json.Unmarshal(env.Params, &req); err != nil { + writeTestChannelResponse(t, encoder, env.ID, pluginapi.ChannelNotifyResponse{Delivered: false, Error: err.Error()}) + continue + } + writeTestChannelResponse(t, encoder, env.ID, pluginapi.ChannelNotifyResponse{Delivered: true}) + if os.Getenv("ANNA_PLUGIN_HELPER_CHANNEL_EXIT_ON_NOTIFY") == "1" { + return + } + case "shutdown", "stop_channel": + writeTestChannelResponse(t, encoder, env.ID, pluginapi.ChannelStopResponse{Stopped: true}) + return + default: + writeTestChannelResponse(t, encoder, env.ID, pluginapi.ChannelNotifyResponse{Delivered: false, Error: env.Method}) + } + } +} + +func testChannelDefinitionWithEnv(t *testing.T, extraEnv map[string]string) Definition { + t.Helper() + root := t.TempDir() + entry := filepath.Join(root, "helper.sh") + var envPrefix string + for k, v := range extraEnv { + envPrefix += fmt.Sprintf("%s=%q ", k, v) + } + script := fmt.Sprintf("#!/bin/sh\nANNA_PLUGIN_HELPER_CHANNEL_PROCESS=1 %sexec %q -test.run TestHelperChannelPluginProcess --\n", envPrefix, os.Args[0]) + if err := os.WriteFile(entry, []byte(script), 0o755); err != nil { + t.Fatal(err) + } + manifestPath := filepath.Join(root, ManifestFilename) + writeManifest(t, manifestPath, pluginapi.Manifest{ + Name: "helper-channel", + Version: "1.0.0", + Kind: pluginapi.KindChannel, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: "helper.sh", + Capabilities: []pluginapi.Capability{ + pluginapi.CapabilityChannelStart, + pluginapi.CapabilityChannelStop, + pluginapi.CapabilityChannelNotify, + pluginapi.CapabilityChannelInbound, + pluginapi.CapabilityHealthCheck, + pluginapi.CapabilityGracefulShutdown, + }, + }) + def, err := LoadDefinition(manifestPath) + if err != nil { + t.Fatal(err) + } + return def +} + +func writeTestChannelResponse(t *testing.T, encoder *json.Encoder, id string, payload any) { + t.Helper() + raw, err := json.Marshal(payload) + if err != nil { + t.Fatal(err) + } + if err := encoder.Encode(pluginapi.Envelope{ + ID: id, + Type: pluginapi.MessageTypeResponse, + Result: raw, + }); err != nil { + t.Fatal(err) + } +} diff --git a/internal/pluginhost/client.go b/internal/pluginhost/client.go new file mode 100644 index 00000000..aca6d1c0 --- /dev/null +++ b/internal/pluginhost/client.go @@ -0,0 +1,270 @@ +package pluginhost + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "github.com/vaayne/anna/internal/pluginapi" +) + +type Client struct { + def Definition + logger *slog.Logger + + cmd *exec.Cmd + stdin io.WriteCloser + stdout *bufio.Reader + + reqMu sync.Mutex + closeMu sync.Mutex + closed atomic.Bool + cancelMu sync.Mutex + exitErr atomic.Pointer[error] + waitDone chan struct{} +} + +type StartOptions struct { + Logger *slog.Logger +} + +const builtinRuntimeShutdownTimeout = 250 * time.Millisecond + +func Start(ctx context.Context, def Definition, opts StartOptions) (*Client, error) { + logger := opts.Logger + if logger == nil { + logger = slog.Default() + } + + entrypoint := def.Entrypoint() + if entrypoint == BuiltinEntrypoint { + if override := os.Getenv("ANNA_PLUGIN_ENTRYPOINT"); override != "" { + entrypoint = override + } else { + exePath, err := os.Executable() + if err != nil { + return nil, fmt.Errorf("resolve current executable: %w", err) + } + helperName := "anna-plugin" + filepath.Ext(exePath) + entrypoint = filepath.Join(filepath.Dir(exePath), helperName) + } + } + + cmd := exec.CommandContext(ctx, entrypoint, def.Manifest.Args...) + cmd.Dir = def.RootDir + if def.Manifest.Metadata != nil && def.Manifest.Entrypoint == BuiltinEntrypoint { + env := os.Environ() + if v, ok := def.Manifest.Metadata["work_dir"].(string); ok && v != "" { + env = append(env, "ANNA_PLUGIN_WORKDIR="+v) + } + if v, ok := def.Manifest.Metadata["user_data_dir"].(string); ok && v != "" { + env = append(env, "ANNA_PLUGIN_USER_DATA_DIR="+v) + } + cmd.Env = env + } + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("stdin pipe: %w", err) + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("stdout pipe: %w", err) + } + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("start process: %w", err) + } + + c := &Client{ + def: def, + logger: logger.With("plugin", def.ID()), + cmd: cmd, + stdin: stdin, + stdout: bufio.NewReader(stdout), + waitDone: make(chan struct{}), + } + + go c.consumeStderr(stderr) + go c.waitProcess() + + var hs pluginapi.HandshakeResponse + if err := c.Request(ctx, "handshake", pluginapi.HandshakeRequest{ + ProtocolVersion: pluginapi.ProtocolVersion, + }, &hs); err != nil { + _ = c.Close() + return nil, err + } + + if hs.ProtocolVersion != pluginapi.ProtocolVersion { + _ = c.Close() + return nil, fmt.Errorf("plugin %s protocol mismatch: got %q want %q", def.ID(), hs.ProtocolVersion, pluginapi.ProtocolVersion) + } + if hs.Name != def.Manifest.Name || hs.Kind != def.Manifest.Kind { + _ = c.Close() + return nil, fmt.Errorf("plugin %s handshake mismatch: got name=%q kind=%q", def.ID(), hs.Name, hs.Kind) + } + + return c, nil +} + +func (c *Client) Request(ctx context.Context, method string, params any, out any) error { + if c.isClosed() { + return errors.New("plugin client is closed") + } + return c.request(ctx, method, params, out) +} + +func (c *Client) Health(ctx context.Context) error { + var resp pluginapi.HealthResponse + if err := c.Request(ctx, "health", struct{}{}, &resp); err != nil { + return err + } + if !resp.OK { + return errors.New("plugin reported unhealthy") + } + return nil +} + +func (c *Client) Alive() bool { + select { + case <-c.waitDone: + return false + default: + return true + } +} + +func (c *Client) Wait() error { + <-c.waitDone + ptr := c.exitErr.Load() + if ptr == nil { + return nil + } + return *ptr +} + +func (c *Client) Close() error { + c.closeMu.Lock() + defer c.closeMu.Unlock() + if c.closed.Load() { + return nil + } + c.closed.Store(true) + + ctx, cancel := context.WithTimeout(context.Background(), builtinRuntimeShutdownTimeout) + defer cancel() + _ = c.request(ctx, "shutdown", struct{}{}, nil) + if c.stdin != nil { + _ = c.stdin.Close() + } + if c.cmd != nil && c.Alive() { + select { + case <-c.waitDone: + case <-time.After(builtinRuntimeShutdownTimeout): + _ = c.cmd.Process.Kill() + } + } + return c.Wait() +} + +func (c *Client) waitProcess() { + err := c.cmd.Wait() + if err != nil { + c.exitErr.Store(&err) + } + close(c.waitDone) +} + +func (c *Client) consumeStderr(r io.Reader) { + scanner := bufio.NewScanner(r) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + c.logger.Info("plugin stderr", "line", line) + } + if err := scanner.Err(); err != nil { + c.logger.Warn("plugin stderr read failed", "error", err) + } +} + +func (c *Client) isClosed() bool { + return c.closed.Load() +} + +func (c *Client) request(ctx context.Context, method string, params any, out any) error { + c.reqMu.Lock() + defer c.reqMu.Unlock() + + payload, err := json.Marshal(params) + if err != nil { + return fmt.Errorf("marshal params: %w", err) + } + + env := pluginapi.Envelope{ + ID: uuid.NewString(), + Type: pluginapi.MessageTypeRequest, + Method: method, + Params: payload, + } + if err := writeEnvelope(c.stdin, env); err != nil { + return err + } + + done := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + c.abortCurrentRequest() + case <-done: + } + }() + defer close(done) + + resp, err := readEnvelope(c.stdout) + if err != nil { + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + return err + } + if resp.Type != pluginapi.MessageTypeResponse { + return fmt.Errorf("unexpected message type %q", resp.Type) + } + if resp.Error != nil { + return resp.Error + } + if out == nil || len(resp.Result) == 0 { + return nil + } + if err := json.Unmarshal(resp.Result, out); err != nil { + return fmt.Errorf("decode response: %w", err) + } + return nil +} + +func (c *Client) abortCurrentRequest() { + c.cancelMu.Lock() + defer c.cancelMu.Unlock() + if c.cmd != nil && c.Alive() && c.cmd.Process != nil { + _ = c.cmd.Process.Kill() + } +} diff --git a/internal/pluginhost/client_test.go b/internal/pluginhost/client_test.go new file mode 100644 index 00000000..d4ebdc34 --- /dev/null +++ b/internal/pluginhost/client_test.go @@ -0,0 +1,210 @@ +package pluginhost + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/vaayne/anna/internal/pluginapi" +) + +func TestClientHandshakeAndHealth(t *testing.T) { + def := testDefinition(t) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + client, err := Start(ctx, def, StartOptions{}) + if err != nil { + t.Fatalf("Start() error = %v", err) + } + defer func() { _ = client.Close() }() + + if err := client.Health(ctx); err != nil { + t.Fatalf("Health() error = %v", err) + } +} + +func TestSupervisorRestartsDeadPlugin(t *testing.T) { + def := testDefinition(t) + supervisor := NewSupervisor(def, SupervisorOptions{RestartDelay: time.Millisecond}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + client, err := supervisor.Start(ctx) + if err != nil { + t.Fatalf("Start() error = %v", err) + } + if err := client.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + + restarted, err := supervisor.EnsureHealthy(ctx) + if err != nil { + t.Fatalf("EnsureHealthy() error = %v", err) + } + defer func() { _ = supervisor.Close() }() + + if restarted == nil || !restarted.Alive() { + t.Fatal("expected restarted client to be alive") + } + if got := supervisor.RestartCount(); got < 1 { + t.Fatalf("RestartCount() = %d, want >= 1", got) + } +} + +func TestClientCancelsByKillingPluginProcess(t *testing.T) { + def := testDefinitionWithEnv(t, map[string]string{ + "ANNA_PLUGIN_HELPER_SLOW_HEALTH": "1", + }) + + client, err := Start(context.Background(), def, StartOptions{}) + if err != nil { + t.Fatalf("Start() error = %v", err) + } + defer func() { _ = client.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + err = client.Health(ctx) + if err == nil { + t.Fatal("Health() error = nil, want deadline exceeded") + } + if err != context.DeadlineExceeded { + t.Fatalf("Health() error = %v, want %v", err, context.DeadlineExceeded) + } + + time.Sleep(50 * time.Millisecond) + if client.Alive() { + t.Fatal("expected plugin process to be terminated after request cancellation") + } +} + +func TestClientCloseKillsUnresponsivePlugin(t *testing.T) { + def := testDefinitionWithEnv(t, map[string]string{ + "ANNA_PLUGIN_HELPER_SLOW_SHUTDOWN": "1", + }) + + client, err := Start(context.Background(), def, StartOptions{}) + if err != nil { + t.Fatalf("Start() error = %v", err) + } + + start := time.Now() + err = client.Close() + if err == nil { + t.Fatal("Close() error = nil, want killed process error") + } + if time.Since(start) > time.Second { + t.Fatalf("Close() took too long: %v", time.Since(start)) + } + if client.Alive() { + t.Fatal("expected plugin process to be terminated after slow shutdown") + } +} + +func TestHelperPluginProcess(t *testing.T) { + if os.Getenv("ANNA_PLUGIN_HELPER_PROCESS") != "1" { + return + } + + decoder := json.NewDecoder(os.Stdin) + encoder := json.NewEncoder(os.Stdout) + for { + var env pluginapi.Envelope + if err := decoder.Decode(&env); err != nil { + return + } + + switch env.Method { + case "handshake": + writeTestResponse(t, encoder, env.ID, pluginapi.HandshakeResponse{ + ProtocolVersion: pluginapi.ProtocolVersion, + Name: "helper", + Version: "1.0.0", + Kind: pluginapi.KindTool, + Capabilities: []pluginapi.Capability{ + pluginapi.CapabilityHealthCheck, + pluginapi.CapabilityGracefulShutdown, + }, + }) + case "health": + if os.Getenv("ANNA_PLUGIN_HELPER_SLOW_HEALTH") == "1" { + time.Sleep(5 * time.Second) + } + writeTestResponse(t, encoder, env.ID, pluginapi.HealthResponse{OK: true}) + case "shutdown": + if os.Getenv("ANNA_PLUGIN_HELPER_SLOW_SHUTDOWN") == "1" { + time.Sleep(5 * time.Second) + } + writeTestResponse(t, encoder, env.ID, struct{}{}) + return + default: + _ = encoder.Encode(pluginapi.Envelope{ + ID: env.ID, + Type: pluginapi.MessageTypeResponse, + Error: &pluginapi.RPCError{ + Code: "unknown_method", + Message: env.Method, + }, + }) + } + } +} + +func testDefinition(t *testing.T) Definition { + return testDefinitionWithEnv(t, nil) +} + +func testDefinitionWithEnv(t *testing.T, extraEnv map[string]string) Definition { + t.Helper() + root := t.TempDir() + entry := filepath.Join(root, "helper.sh") + var envPrefix string + for k, v := range extraEnv { + envPrefix += fmt.Sprintf("%s=%q ", k, v) + } + script := fmt.Sprintf("#!/bin/sh\nANNA_PLUGIN_HELPER_PROCESS=1 %sexec %q -test.run TestHelperPluginProcess --\n", envPrefix, os.Args[0]) + if err := os.WriteFile(entry, []byte(script), 0o755); err != nil { + t.Fatal(err) + } + manifestPath := filepath.Join(root, ManifestFilename) + writeManifest(t, manifestPath, pluginapi.Manifest{ + Name: "helper", + Version: "1.0.0", + Kind: pluginapi.KindTool, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: "helper.sh", + Tool: &pluginapi.ToolSpec{ + Name: "helper", + Description: "helper", + InputSchema: map[string]any{}, + }, + }) + def, err := LoadDefinition(manifestPath) + if err != nil { + t.Fatal(err) + } + return def +} + +func writeTestResponse(t *testing.T, encoder *json.Encoder, id string, payload any) { + t.Helper() + raw, err := json.Marshal(payload) + if err != nil { + t.Fatal(err) + } + if err := encoder.Encode(pluginapi.Envelope{ + ID: id, + Type: pluginapi.MessageTypeResponse, + Result: raw, + }); err != nil { + t.Fatal(err) + } +} diff --git a/internal/pluginhost/manifest.go b/internal/pluginhost/manifest.go new file mode 100644 index 00000000..8f712542 --- /dev/null +++ b/internal/pluginhost/manifest.go @@ -0,0 +1,87 @@ +package pluginhost + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/vaayne/anna/internal/pluginapi" +) + +const ManifestFilename = "plugin.json" +const BuiltinEntrypoint = "@anna" + +type Definition struct { + Manifest pluginapi.Manifest + ManifestPath string + RootDir string +} + +func LoadDefinition(path string) (Definition, error) { + data, err := os.ReadFile(path) + if err != nil { + return Definition{}, fmt.Errorf("read manifest: %w", err) + } + + var manifest pluginapi.Manifest + if err := json.Unmarshal(data, &manifest); err != nil { + return Definition{}, fmt.Errorf("parse manifest: %w", err) + } + + def := Definition{ + Manifest: manifest, + ManifestPath: path, + RootDir: filepath.Dir(path), + } + if err := def.Validate(); err != nil { + return Definition{}, err + } + return def, nil +} + +func (d Definition) Validate() error { + switch { + case d.Manifest.Name == "": + return fmt.Errorf("manifest %s: name is required", d.ManifestPath) + case d.Manifest.Version == "": + return fmt.Errorf("manifest %s: version is required", d.ManifestPath) + case d.Manifest.Kind == "": + return fmt.Errorf("manifest %s: kind is required", d.ManifestPath) + case d.Manifest.ProtocolVersion == "": + return fmt.Errorf("manifest %s: protocol_version is required", d.ManifestPath) + case d.Manifest.ProtocolVersion != pluginapi.ProtocolVersion: + return fmt.Errorf("manifest %s: unsupported protocol_version %q", d.ManifestPath, d.Manifest.ProtocolVersion) + case d.Manifest.Entrypoint == "": + return fmt.Errorf("manifest %s: entrypoint is required", d.ManifestPath) + case d.Manifest.Kind == pluginapi.KindTool && d.Manifest.Tool == nil: + return fmt.Errorf("manifest %s: tool definition is required for tool plugins", d.ManifestPath) + } + + entrypoint := d.Entrypoint() + if entrypoint != BuiltinEntrypoint { + info, err := os.Stat(entrypoint) + if err != nil { + return fmt.Errorf("manifest %s: entrypoint %q: %w", d.ManifestPath, entrypoint, err) + } + if info.IsDir() { + return fmt.Errorf("manifest %s: entrypoint %q is a directory", d.ManifestPath, entrypoint) + } + } + + return nil +} + +func (d Definition) Entrypoint() string { + if d.Manifest.Entrypoint == BuiltinEntrypoint { + return BuiltinEntrypoint + } + if filepath.IsAbs(d.Manifest.Entrypoint) { + return d.Manifest.Entrypoint + } + return filepath.Join(d.RootDir, d.Manifest.Entrypoint) +} + +func (d Definition) ID() string { + return string(d.Manifest.Kind) + "/" + d.Manifest.Name +} diff --git a/internal/pluginhost/manifest_test.go b/internal/pluginhost/manifest_test.go new file mode 100644 index 00000000..b4b421fd --- /dev/null +++ b/internal/pluginhost/manifest_test.go @@ -0,0 +1,65 @@ +package pluginhost + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/vaayne/anna/internal/pluginapi" +) + +func TestLoadDefinition(t *testing.T) { + root := t.TempDir() + entrypoint := filepath.Join(root, "plugin.sh") + if err := os.WriteFile(entrypoint, []byte("#!/bin/sh\n"), 0o755); err != nil { + t.Fatal(err) + } + + manifestPath := filepath.Join(root, ManifestFilename) + writeManifest(t, manifestPath, pluginapi.Manifest{ + Name: "telegram", + Version: "1.0.0", + Kind: pluginapi.KindChannel, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: "plugin.sh", + }) + + def, err := LoadDefinition(manifestPath) + if err != nil { + t.Fatalf("LoadDefinition() error = %v", err) + } + if got := def.Entrypoint(); got != entrypoint { + t.Fatalf("Entrypoint() = %q, want %q", got, entrypoint) + } +} + +func TestLoadDefinitionRejectsMissingEntrypoint(t *testing.T) { + root := t.TempDir() + manifestPath := filepath.Join(root, ManifestFilename) + writeManifest(t, manifestPath, pluginapi.Manifest{ + Name: "telegram", + Version: "1.0.0", + Kind: pluginapi.KindChannel, + ProtocolVersion: pluginapi.ProtocolVersion, + Entrypoint: "missing.sh", + }) + + if _, err := LoadDefinition(manifestPath); err == nil { + t.Fatal("LoadDefinition() error = nil, want missing entrypoint error") + } +} + +func writeManifest(t *testing.T, path string, manifest pluginapi.Manifest) { + t.Helper() + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + t.Fatal(err) + } + data, err := json.Marshal(manifest) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(path, data, 0o644); err != nil { + t.Fatal(err) + } +} diff --git a/internal/pluginhost/protocol.go b/internal/pluginhost/protocol.go new file mode 100644 index 00000000..62d3fa3c --- /dev/null +++ b/internal/pluginhost/protocol.go @@ -0,0 +1,34 @@ +package pluginhost + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + + "github.com/vaayne/anna/internal/pluginapi" +) + +func writeEnvelope(w io.Writer, env pluginapi.Envelope) error { + data, err := json.Marshal(env) + if err != nil { + return fmt.Errorf("marshal envelope: %w", err) + } + if _, err := w.Write(append(data, '\n')); err != nil { + return fmt.Errorf("write envelope: %w", err) + } + return nil +} + +func readEnvelope(r *bufio.Reader) (pluginapi.Envelope, error) { + line, err := r.ReadBytes('\n') + if err != nil { + return pluginapi.Envelope{}, err + } + + var env pluginapi.Envelope + if err := json.Unmarshal(line, &env); err != nil { + return pluginapi.Envelope{}, fmt.Errorf("decode envelope: %w", err) + } + return env, nil +} diff --git a/internal/pluginhost/server.go b/internal/pluginhost/server.go new file mode 100644 index 00000000..ab7a5be1 --- /dev/null +++ b/internal/pluginhost/server.go @@ -0,0 +1,220 @@ +package pluginhost + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + + "github.com/vaayne/anna/internal/pluginapi" +) + +// ToolRuntime is the minimal contract a tool implementation must satisfy to be +// served over the subprocess plugin protocol. +type ToolRuntime interface { + Execute(ctx context.Context, args map[string]any) (string, error) +} + +// ChannelRuntime is the minimal contract a channel implementation must satisfy +// to be served over the subprocess plugin protocol. +type ChannelRuntime interface { + Start(ctx context.Context) error + Notify(ctx context.Context, n pluginapi.ChannelNotification) error +} + +// ServeTool runs a single-tool plugin protocol loop on the given streams. +func ServeTool(ctx context.Context, def Definition, runtime ToolRuntime, in io.Reader, out io.Writer) error { + reader := bufio.NewReader(in) + for { + env, err := readEnvelope(reader) + if err != nil { + return err + } + + switch env.Method { + case "handshake": + result := pluginapi.HandshakeResponse{ + ProtocolVersion: pluginapi.ProtocolVersion, + Name: def.Manifest.Name, + Version: def.Manifest.Version, + Kind: def.Manifest.Kind, + Capabilities: def.Manifest.Capabilities, + Tool: def.Manifest.Tool, + } + if err := writeResponse(out, env.ID, result); err != nil { + return err + } + + case "health": + if err := writeResponse(out, env.ID, pluginapi.HealthResponse{OK: true}); err != nil { + return err + } + + case "call_tool": + var req pluginapi.ToolCallRequest + if err := json.Unmarshal(env.Params, &req); err != nil { + if err := writeError(out, env.ID, "bad_request", fmt.Sprintf("decode tool request: %v", err)); err != nil { + return err + } + continue + } + + result, err := runtime.Execute(ctx, req.Arguments) + resp := pluginapi.ToolCallResponse{Output: result} + if err != nil { + resp.Error = err.Error() + } + if err := writeResponse(out, env.ID, resp); err != nil { + return err + } + + case "shutdown": + if err := writeResponse(out, env.ID, struct{}{}); err != nil { + return err + } + return nil + + default: + if err := writeError(out, env.ID, "unknown_method", env.Method); err != nil { + return err + } + } + } +} + +// ServeChannel runs a channel plugin protocol loop on the given streams. +func ServeChannel(ctx context.Context, def Definition, runtime ChannelRuntime, in io.Reader, out io.Writer) error { + runtimeCtx, cancel := context.WithCancel(ctx) + defer cancel() + + reqCh := make(chan pluginapi.Envelope, 8) + readErrCh := make(chan error, 1) + go func() { + reader := bufio.NewReader(in) + for { + env, err := readEnvelope(reader) + if err != nil { + readErrCh <- err + close(reqCh) + return + } + reqCh <- env + } + }() + + runErrCh := make(chan error, 1) + go func() { + if err := runtime.Start(runtimeCtx); err != nil && runtimeCtx.Err() == nil { + runErrCh <- err + return + } + runErrCh <- nil + }() + + for { + select { + case err := <-runErrCh: + if err != nil { + return err + } + if runtimeCtx.Err() != nil { + return runtimeCtx.Err() + } + return nil + + case err := <-readErrCh: + if err != nil { + if runtimeCtx.Err() != nil { + return runtimeCtx.Err() + } + return err + } + + case env, ok := <-reqCh: + if !ok { + if runtimeCtx.Err() != nil { + return runtimeCtx.Err() + } + return nil + } + + switch env.Method { + case "handshake": + result := pluginapi.HandshakeResponse{ + ProtocolVersion: pluginapi.ProtocolVersion, + Name: def.Manifest.Name, + Version: def.Manifest.Version, + Kind: def.Manifest.Kind, + Capabilities: def.Manifest.Capabilities, + } + if err := writeResponse(out, env.ID, result); err != nil { + return err + } + + case "health": + if err := writeResponse(out, env.ID, pluginapi.HealthResponse{OK: runtimeCtx.Err() == nil}); err != nil { + return err + } + + case "start_channel": + if err := writeResponse(out, env.ID, pluginapi.ChannelStartResponse{Started: true}); err != nil { + return err + } + + case "notify": + var req pluginapi.ChannelNotifyRequest + if err := json.Unmarshal(env.Params, &req); err != nil { + if err := writeError(out, env.ID, "bad_request", fmt.Sprintf("decode notify request: %v", err)); err != nil { + return err + } + continue + } + + resp := pluginapi.ChannelNotifyResponse{Delivered: true} + if err := runtime.Notify(runtimeCtx, req.Notification); err != nil { + resp.Delivered = false + resp.Error = err.Error() + } + if err := writeResponse(out, env.ID, resp); err != nil { + return err + } + + case "stop_channel", "shutdown": + cancel() + if err := writeResponse(out, env.ID, pluginapi.ChannelStopResponse{Stopped: true}); err != nil { + return err + } + return nil + + default: + if err := writeError(out, env.ID, "unknown_method", env.Method); err != nil { + return err + } + } + } + } +} + +func writeResponse(w io.Writer, id string, payload any) error { + data, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal response: %w", err) + } + return writeEnvelope(w, pluginapi.Envelope{ + ID: id, + Type: pluginapi.MessageTypeResponse, + Result: data, + }) +} + +func writeError(w io.Writer, id, code, message string) error { + return writeEnvelope(w, pluginapi.Envelope{ + ID: id, + Type: pluginapi.MessageTypeResponse, + Error: &pluginapi.RPCError{ + Code: code, + Message: message, + }, + }) +} diff --git a/internal/pluginhost/supervisor.go b/internal/pluginhost/supervisor.go new file mode 100644 index 00000000..e876ec33 --- /dev/null +++ b/internal/pluginhost/supervisor.go @@ -0,0 +1,101 @@ +package pluginhost + +import ( + "context" + "log/slog" + "sync" + "time" +) + +type Supervisor struct { + def Definition + logger *slog.Logger + restartDelay time.Duration + + mu sync.Mutex + client *Client + restarts int +} + +type SupervisorOptions struct { + Logger *slog.Logger + RestartDelay time.Duration +} + +func NewSupervisor(def Definition, opts SupervisorOptions) *Supervisor { + logger := opts.Logger + if logger == nil { + logger = slog.Default() + } + delay := opts.RestartDelay + if delay <= 0 { + delay = 100 * time.Millisecond + } + return &Supervisor{ + def: def, + logger: logger.With("plugin", def.ID()), + restartDelay: delay, + } +} + +func (s *Supervisor) Start(ctx context.Context) (*Client, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.client != nil && s.client.Alive() { + return s.client, nil + } + + client, err := Start(ctx, s.def, StartOptions{Logger: s.logger}) + if err != nil { + return nil, err + } + s.client = client + return client, nil +} + +func (s *Supervisor) EnsureHealthy(ctx context.Context) (*Client, error) { + s.mu.Lock() + client := s.client + s.mu.Unlock() + + if client == nil || !client.Alive() { + return s.Restart(ctx) + } + if err := client.Health(ctx); err != nil { + return s.Restart(ctx) + } + return client, nil +} + +func (s *Supervisor) Restart(ctx context.Context) (*Client, error) { + s.mu.Lock() + old := s.client + s.client = nil + s.restarts++ + delay := s.restartDelay + s.mu.Unlock() + + if old != nil { + _ = old.Close() + } + time.Sleep(delay) + return s.Start(ctx) +} + +func (s *Supervisor) Close() error { + s.mu.Lock() + client := s.client + s.client = nil + s.mu.Unlock() + if client == nil { + return nil + } + return client.Close() +} + +func (s *Supervisor) RestartCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.restarts +} diff --git a/mise.toml b/mise.toml index 5919d5b8..ba98830a 100644 --- a/mise.toml +++ b/mise.toml @@ -9,6 +9,7 @@ go = "1.25" [env] ANNA_HOME = ".agents/anna" BINARY_NAME = "anna" +PLUGIN_BINARY_NAME = "anna-plugin" # ============================================================================ # Build @@ -17,7 +18,7 @@ BINARY_NAME = "anna" [tasks.build] description = "Build anna binary" depends = ["generate"] -run = "VERSION=${VERSION:-$(git describe --tags --always --dirty 2>/dev/null || echo dev)} && go build -ldflags=\"-X main.version=$VERSION\" -o bin/$BINARY_NAME ./cmd/anna" +run = "VERSION=${VERSION:-$(git describe --tags --always --dirty 2>/dev/null || echo dev)} && go build -ldflags=\"-X main.version=$VERSION\" -o bin/$BINARY_NAME ./cmd/anna && go build -o bin/$PLUGIN_BINARY_NAME ./cmd/anna-plugin" [tasks.clean] description = "Remove build artifacts"