diff --git a/CHANGELOG.md b/CHANGELOG.md index 271c696..31d47c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.7.1] - 2026-02-23 + +### Added +- **Agent Run/Stop control**: Separate agent lifecycle from WebUI — start/stop the agent at runtime without killing the server. New `AgentLifecycle` state machine (`stopped/starting/running/stopping`), REST endpoints (`POST /api/agent/start`, `/stop`, `GET /api/agent/status`), SSE endpoint (`GET /api/agent/events`) for real-time state push, `useAgentStatus` hook (SSE + polling fallback), and `AgentControl` sidebar component with confirmation dialog +- **MCP Streamable HTTP transport**: `StreamableHTTPClientTransport` as primary transport for URL-based MCP servers, with automatic fallback to `SSEClientTransport` on failure. `mcpServers` list is now a lazy function for live status. Resource cleanup (AbortController, sockets) on fallback. Improved error logging with stack traces + +### Fixed +- **WebUI setup wizard**: Neutralize color accent overuse — selection states, warning cards, tag pills, step dots all moved to neutral white/grey palette; security notice collapsed into `
`; "Optional Integrations" renamed to "Optional API Keys"; bot token marked as "(recommended)" +- **Jetton send**: Wrap entire `sendJetton` flow in try/catch for consistent `PluginSDKError` propagation; remove `SendMode.IGNORE_ERRORS` (errors are no longer silently swallowed); fix `||` → `??` on jetton decimals (prevents `0` decimals being replaced by `9`) + ## [0.7.0] - 2026-02-21 ### Added @@ -268,7 +278,8 @@ Git history rewritten to fix commit attribution (email update from `tonresistor@ - Professional distribution (npm, Docker, CI/CD) - Pre-commit hooks and linting infrastructure -[Unreleased]: https://github.com/TONresistor/teleton-agent/compare/v0.7.0...HEAD +[Unreleased]: https://github.com/TONresistor/teleton-agent/compare/v0.7.1...HEAD +[0.7.1]: https://github.com/TONresistor/teleton-agent/compare/v0.7.0...v0.7.1 [0.7.0]: https://github.com/TONresistor/teleton-agent/compare/v0.6.0...v0.7.0 [0.6.0]: https://github.com/TONresistor/teleton-agent/compare/v0.5.2...v0.6.0 [0.5.2]: https://github.com/TONresistor/teleton-agent/compare/v0.5.1...v0.5.2 diff --git a/README.md b/README.md index afe12b1..f51ce35 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -

Teleton Agent

+

+ Teleton Agent +

Autonomous AI agent platform for Telegram with native TON blockchain integration

@@ -7,21 +9,22 @@ Node.js TypeScript Website - Ask DeepWiki + Documentation + Built on TON

--- -

Teleton is an autonomous AI agent platform that operates as a real Telegram user account (not a bot). It thinks through an agentic loop with tool calling, remembers conversations across sessions with hybrid RAG, and natively integrates the TON blockchain: send crypto, swap on DEXs, bid on domains, verify payments - all from a chat message. It can schedule tasks to run autonomously at any time. It ships with 114 built-in tools, supports 6 LLM providers, and exposes a Plugin SDK so you can build your own tools on top of the platform.

+

Teleton is an autonomous AI agent platform that operates as a real Telegram user account (not a bot). It thinks through an agentic loop with tool calling, remembers conversations across sessions with hybrid RAG, and natively integrates the TON blockchain: send crypto, swap on DEXs, bid on domains, verify payments - all from a chat message. It can schedule tasks to run autonomously at any time. It ships with 100+ built-in tools, supports 10 LLM providers, and exposes a Plugin SDK so you can build your own tools on top of the platform.

### Key Highlights - **Full Telegram access** - Operates as a real user via MTProto (GramJS), not a limited bot - **Agentic loop** - Up to 5 iterations of tool calling per message, the agent thinks, acts, observes, and repeats -- **Multi-Provider LLM** - Anthropic, OpenAI, Google Gemini, xAI Grok, Groq, OpenRouter +- **Multi-Provider LLM** - Anthropic, OpenAI, Google Gemini, xAI Grok, Groq, OpenRouter, Moonshot, Mistral, Cocoon, Local - **TON Blockchain** - Built-in W5R1 wallet, send/receive TON & jettons, swap on STON.fi and DeDust, NFTs, DNS domains - **Persistent memory** - Hybrid RAG (sqlite-vec + FTS5), auto-compaction with AI summarization, daily logs -- **114 built-in tools** - Messaging, media, blockchain, DEX trading, deals, DNS, journaling, and more +- **100+ built-in tools** - Messaging, media, blockchain, DEX trading, deals, DNS, journaling, and more - **Plugin SDK** - Extend the agent with custom tools, frozen SDK with isolated databases, secrets management, lifecycle hooks - **MCP Client** - Connect external tool servers (stdio/SSE) with 2 lines of YAML, no code, no rebuild - **Secure by design** - Prompt injection defense, sandboxed workspace, plugin isolation, wallet encryption @@ -48,7 +51,7 @@ | Capability | Description | | ----------------------- | --------------------------------------------------------------------------------------------------------------------------- | -| **Multi-Provider LLM** | Switch between Anthropic, OpenAI, Google, xAI, Groq, OpenRouter with one config change | +| **Multi-Provider LLM** | Switch between Anthropic, OpenAI, Google, xAI, Groq, OpenRouter, Moonshot, Mistral, Cocoon, or Local with one config change | | **RAG + Hybrid Search** | Local ONNX embeddings (384d) or Voyage AI (512d/1024d) with FTS5 keyword + sqlite-vec cosine similarity, fused via RRF | | **Auto-Compaction** | AI-summarized context management prevents overflow, preserves key information in `memory/*.md` files | | **Observation Masking** | Compresses old tool results to one-line summaries, saving ~90% context window | @@ -405,7 +408,7 @@ src/ ├── agent/ # Core agent runtime │ ├── runtime.ts # Agentic loop (5 iterations, tool calling, masking, compaction) │ ├── client.ts # Multi-provider LLM client -│ └── tools/ # 114 built-in tools +│ └── tools/ # 100+ built-in tools │ ├── register-all.ts # Central tool registration (8 categories, 109 tools) │ ├── registry.ts # Tool registry, scope filtering, provider limits │ ├── module-loader.ts # Built-in module loading (deals → +5 tools) @@ -460,7 +463,7 @@ src/ │ └── loader.ts # 10 sections: soul + security + strategy + memory + context + ... ├── config/ # Configuration │ ├── schema.ts # Zod schemas + validation -│ └── providers.ts # Multi-provider LLM registry (6 providers) +│ └── providers.ts # Multi-provider LLM registry (10 providers) ├── webui/ # Optional web dashboard │ ├── server.ts # Hono server, auth middleware, static serving │ └── routes/ # 11 API route groups (status, tools, logs, memory, soul, plugins, mcp, tasks, workspace, config, marketplace) diff --git a/package.json b/package.json index d187acc..7c42622 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "teleton", - "version": "0.7.0", + "version": "0.7.2", "workspaces": [ "packages/*" ], diff --git a/src/agent/__tests__/lifecycle-e2e.test.ts b/src/agent/__tests__/lifecycle-e2e.test.ts new file mode 100644 index 0000000..b9e047a --- /dev/null +++ b/src/agent/__tests__/lifecycle-e2e.test.ts @@ -0,0 +1,532 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { Hono } from "hono"; +import { streamSSE } from "hono/streaming"; + +vi.mock("../../utils/logger.js", () => ({ + createLogger: vi.fn(() => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + })), +})); + +import { AgentLifecycle, type AgentState, type StateChangeEvent } from "../lifecycle.js"; + +// ── Helpers ────────────────────────────────────────────────────────────── + +/** Parse SSE text into structured events */ +function parseSSE(text: string): Array<{ event?: string; data?: string; id?: string }> { + const events: Array<{ event?: string; data?: string; id?: string }> = []; + const blocks = text.split("\n\n").filter(Boolean); + for (const block of blocks) { + const entry: { event?: string; data?: string; id?: string } = {}; + for (const line of block.split("\n")) { + if (line.startsWith("event:")) entry.event = line.slice(6).trim(); + else if (line.startsWith("data:")) entry.data = line.slice(5).trim(); + else if (line.startsWith("id:")) entry.id = line.slice(3).trim(); + } + if (entry.event || entry.data) events.push(entry); + } + return events; +} + +/** Wait for lifecycle to reach a specific state */ +function waitForState( + lifecycle: AgentLifecycle, + target: AgentState, + timeoutMs = 2000 +): Promise { + return new Promise((resolve, reject) => { + if (lifecycle.getState() === target) { + resolve(); + return; + } + const timer = setTimeout(() => { + lifecycle.off("stateChange", handler); + reject( + new Error(`Timeout waiting for state "${target}", current: "${lifecycle.getState()}"`) + ); + }, timeoutMs); + const handler = (event: StateChangeEvent) => { + if (event.state === target) { + clearTimeout(timer); + lifecycle.off("stateChange", handler); + resolve(); + } + }; + lifecycle.on("stateChange", handler); + }); +} + +/** + * Build a full Hono app mirroring server.ts agent routes + SSE + a mock /health endpoint. + * This is the "WebUI" portion for E2E testing. + */ +function createE2EApp(lifecycle: AgentLifecycle) { + const app = new Hono(); + + // Health check (always works, even when agent is stopped) + app.get("/health", (c) => c.json({ status: "ok" })); + + // Mock data endpoints (simulate WebUI pages that work when agent is stopped) + app.get("/api/status", (c) => + c.json({ success: true, data: { uptime: 42, model: "test", provider: "test" } }) + ); + app.get("/api/tools", (c) => + c.json({ success: true, data: [{ name: "test_tool", module: "core" }] }) + ); + app.get("/api/memory", (c) => c.json({ success: true, data: { messages: 10, knowledge: 5 } })); + app.get("/api/config", (c) => + c.json({ success: true, data: { agent: { model: "test-model" } } }) + ); + + // Agent lifecycle REST routes + app.post("/api/agent/start", async (c) => { + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + const state = lifecycle.getState(); + if (state === "running") { + return c.json({ state: "running" }, 409); + } + if (state === "stopping") { + return c.json({ error: "Agent is currently stopping, please wait" }, 409); + } + lifecycle.start().catch(() => {}); + return c.json({ state: "starting" }); + }); + + app.post("/api/agent/stop", async (c) => { + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + const state = lifecycle.getState(); + if (state === "stopped") { + return c.json({ state: "stopped" }, 409); + } + if (state === "starting") { + return c.json({ error: "Agent is currently starting, please wait" }, 409); + } + lifecycle.stop().catch(() => {}); + return c.json({ state: "stopping" }); + }); + + app.get("/api/agent/status", (c) => { + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + return c.json({ + state: lifecycle.getState(), + uptime: lifecycle.getUptime(), + error: lifecycle.getError() ?? null, + }); + }); + + // SSE endpoint + app.get("/api/agent/events", (c) => { + return streamSSE(c, async (stream) => { + let aborted = false; + stream.onAbort(() => { + aborted = true; + }); + + const now = Date.now(); + await stream.writeSSE({ + event: "status", + id: String(now), + data: JSON.stringify({ + state: lifecycle.getState(), + error: lifecycle.getError() ?? null, + timestamp: now, + }), + retry: 3000, + }); + + const onStateChange = (event: StateChangeEvent) => { + if (aborted) return; + stream.writeSSE({ + event: "status", + id: String(event.timestamp), + data: JSON.stringify({ + state: event.state, + error: event.error ?? null, + timestamp: event.timestamp, + }), + }); + }; + + lifecycle.on("stateChange", onStateChange); + + // Short sleep for E2E tests (don't loop forever) + await stream.sleep(100); + + lifecycle.off("stateChange", onStateChange); + }); + }); + + return app; +} + +// ── E2E Tests ──────────────────────────────────────────────────────────── + +describe("Agent Lifecycle E2E", () => { + let lifecycle: AgentLifecycle; + let app: Hono; + let startCallCount: number; + let stopCallCount: number; + let startFn: () => Promise; + let stopFn: () => Promise; + + beforeEach(() => { + startCallCount = 0; + stopCallCount = 0; + + startFn = async () => { + startCallCount++; + }; + stopFn = async () => { + stopCallCount++; + }; + + lifecycle = new AgentLifecycle(); + lifecycle.registerCallbacks(startFn, stopFn); + app = createE2EApp(lifecycle); + }); + + afterEach(async () => { + // Ensure lifecycle is stopped to clean up listeners + if (lifecycle.getState() === "running") { + await lifecycle.stop(); + } + }); + + // ── Scenario 1: Full lifecycle start → stop → restart ── + + it("full lifecycle: start → stop → restart (WebUI survives)", async () => { + // 1. Initial state: stopped + let res = await app.request("/api/agent/status"); + let data = await res.json(); + expect(data.state).toBe("stopped"); + + // 2. Start agent via API + res = await app.request("/api/agent/start", { method: "POST" }); + data = await res.json(); + expect(res.status).toBe(200); + expect(data.state).toBe("starting"); + + // Wait for start to complete + await waitForState(lifecycle, "running"); + expect(lifecycle.getState()).toBe("running"); + expect(startCallCount).toBe(1); + + // 3. Verify status shows running with uptime + res = await app.request("/api/agent/status"); + data = await res.json(); + expect(data.state).toBe("running"); + expect(typeof data.uptime).toBe("number"); + + // 4. Stop agent via API + res = await app.request("/api/agent/stop", { method: "POST" }); + data = await res.json(); + expect(res.status).toBe(200); + expect(data.state).toBe("stopping"); + + await waitForState(lifecycle, "stopped"); + expect(lifecycle.getState()).toBe("stopped"); + expect(stopCallCount).toBe(1); + + // 5. WebUI still responds (health check) + res = await app.request("/health"); + expect(res.status).toBe(200); + data = await res.json(); + expect(data.status).toBe("ok"); + + // 6. Restart agent + res = await app.request("/api/agent/start", { method: "POST" }); + expect(res.status).toBe(200); + + await waitForState(lifecycle, "running"); + expect(lifecycle.getState()).toBe("running"); + expect(startCallCount).toBe(2); + + // 7. Stop again for cleanup + await lifecycle.stop(); + expect(stopCallCount).toBe(2); + }); + + // ── Scenario 2: Stop during active processing (graceful drain) ── + + it("stop waits for start to complete before stopping", async () => { + // Simulate a slow start (like connecting to Telegram) + let resolveStart!: () => void; + lifecycle.registerCallbacks( + () => + new Promise((resolve) => { + resolveStart = resolve; + }), + stopFn + ); + + // Start agent (will be pending) + const startRes = await app.request("/api/agent/start", { method: "POST" }); + expect(startRes.status).toBe(200); + expect(lifecycle.getState()).toBe("starting"); + + // Try to stop while starting — should get 409 + const stopRes = await app.request("/api/agent/stop", { method: "POST" }); + expect(stopRes.status).toBe(409); + + // Complete the start + resolveStart(); + await waitForState(lifecycle, "running"); + + // Now stop works + const stopRes2 = await app.request("/api/agent/stop", { method: "POST" }); + expect(stopRes2.status).toBe(200); + await waitForState(lifecycle, "stopped"); + }); + + // ── Scenario 3: Start failure ── + + it("start failure sets error and allows retry", async () => { + let callCount = 0; + lifecycle.registerCallbacks(async () => { + callCount++; + if (callCount <= 2) { + throw new Error(`Telegram auth expired (attempt ${callCount})`); + } + // Third attempt succeeds + }, stopFn); + + // First attempt: fails + const res1 = await app.request("/api/agent/start", { method: "POST" }); + expect(res1.status).toBe(200); + + await waitForState(lifecycle, "stopped"); + expect(lifecycle.getError()).toContain("Telegram auth expired (attempt 1)"); + + // Status shows error + const statusRes = await app.request("/api/agent/status"); + const status = await statusRes.json(); + expect(status.state).toBe("stopped"); + expect(status.error).toContain("attempt 1"); + + // Second attempt: fails + const res2 = await app.request("/api/agent/start", { method: "POST" }); + expect(res2.status).toBe(200); + await waitForState(lifecycle, "stopped"); + expect(lifecycle.getError()).toContain("attempt 2"); + + // Third attempt: succeeds + const res3 = await app.request("/api/agent/start", { method: "POST" }); + expect(res3.status).toBe(200); + await waitForState(lifecycle, "running"); + expect(lifecycle.getError()).toBeUndefined(); + expect(lifecycle.getState()).toBe("running"); + + // Cleanup + await lifecycle.stop(); + }); + + // ── Scenario 4: SSE delivers correct state on reconnection ── + + it("SSE reconnection delivers correct state", async () => { + // Start agent + await lifecycle.start(); + expect(lifecycle.getState()).toBe("running"); + + // Connect SSE — should get "running" as initial state + let res = await app.request("/api/agent/events"); + let text = await res.text(); + let events = parseSSE(text); + expect(events.length).toBeGreaterThanOrEqual(1); + let firstData = JSON.parse(events[0].data!); + expect(firstData.state).toBe("running"); + + // Stop agent + await lifecycle.stop(); + expect(lifecycle.getState()).toBe("stopped"); + + // "Reconnect" SSE — should get "stopped" as initial state + res = await app.request("/api/agent/events"); + text = await res.text(); + events = parseSSE(text); + expect(events.length).toBeGreaterThanOrEqual(1); + firstData = JSON.parse(events[0].data!); + expect(firstData.state).toBe("stopped"); + }); + + // ── Scenario 5: Concurrent start/stop calls are safe ── + + it("concurrent start calls return same promise (no race)", async () => { + // Fire two starts simultaneously + const [res1, res2] = await Promise.all([ + app.request("/api/agent/start", { method: "POST" }), + app.request("/api/agent/start", { method: "POST" }), + ]); + + const data1 = await res1.json(); + const data2 = await res2.json(); + + // First gets 200 starting, second should get 200 starting or 409 running + // (depends on timing — both are valid) + expect([200, 409]).toContain(res1.status); + expect([200, 409]).toContain(res2.status); + + await waitForState(lifecycle, "running"); + + // Agent started exactly once + expect(startCallCount).toBe(1); + + // Cleanup + await lifecycle.stop(); + }); + + it("concurrent stop calls after running are safe", async () => { + await lifecycle.start(); + + const [res1, res2] = await Promise.all([ + app.request("/api/agent/stop", { method: "POST" }), + app.request("/api/agent/stop", { method: "POST" }), + ]); + + // One should get 200, the other might get 200 or 409 (already stopping) + expect([200, 409]).toContain(res1.status); + expect([200, 409]).toContain(res2.status); + + await waitForState(lifecycle, "stopped"); + + // Agent stopped exactly once + expect(stopCallCount).toBe(1); + }); + + // ── Scenario 6: Config reload on restart ── + + it("startFn is called on each start (config reload opportunity)", async () => { + const models: string[] = []; + let currentModel = "gpt-4"; + + lifecycle.registerCallbacks(async () => { + // Simulate reading config from disk on each start + models.push(currentModel); + }, stopFn); + + // First start: uses gpt-4 + await lifecycle.start(); + expect(models).toEqual(["gpt-4"]); + + await lifecycle.stop(); + + // "Edit config" while stopped + currentModel = "claude-opus-4-6"; + + // Second start: picks up new config + await lifecycle.start(); + expect(models).toEqual(["gpt-4", "claude-opus-4-6"]); + + await lifecycle.stop(); + }); + + // ── Scenario 7: Graceful shutdown (lifecycle + WebUI) ── + + it("full stop tears down agent then WebUI stays up", async () => { + const teardownOrder: string[] = []; + + lifecycle.registerCallbacks(startFn, async () => { + teardownOrder.push("agent-stopped"); + }); + + await lifecycle.start(); + expect(lifecycle.getState()).toBe("running"); + + // Simulate graceful shutdown: stop lifecycle first + await lifecycle.stop(); + teardownOrder.push("webui-still-up"); + + // WebUI is still responding + const res = await app.request("/health"); + expect(res.status).toBe(200); + + expect(teardownOrder).toEqual(["agent-stopped", "webui-still-up"]); + expect(lifecycle.getState()).toBe("stopped"); + }); + + // ── Scenario 8: WebUI pages accessible while agent stopped ── + + it("all WebUI data endpoints respond while agent is stopped", async () => { + // Agent is stopped — verify all data endpoints still work + expect(lifecycle.getState()).toBe("stopped"); + + const endpoints = [ + "/health", + "/api/status", + "/api/tools", + "/api/memory", + "/api/config", + "/api/agent/status", + ]; + + for (const endpoint of endpoints) { + const res = await app.request(endpoint); + expect(res.status).toBe(200); + const data = await res.json(); + expect(data).toBeDefined(); + } + + // Agent lifecycle routes also work + const statusRes = await app.request("/api/agent/status"); + const status = await statusRes.json(); + expect(status.state).toBe("stopped"); + expect(status.uptime).toBeNull(); + }); + + // ── Extra: SSE emits events during start→stop sequence ── + + it("SSE captures full start→stop state transition sequence", async () => { + // Build a custom SSE app that collects events during a start→stop cycle + const sseApp = new Hono(); + sseApp.get("/events", (c) => { + return streamSSE(c, async (stream) => { + let aborted = false; + stream.onAbort(() => { + aborted = true; + }); + + const collected: StateChangeEvent[] = []; + + // Push initial + await stream.writeSSE({ + event: "status", + data: JSON.stringify({ state: lifecycle.getState() }), + }); + + const onStateChange = (event: StateChangeEvent) => { + if (aborted) return; + collected.push(event); + stream.writeSSE({ + event: "status", + data: JSON.stringify({ state: event.state, error: event.error ?? null }), + }); + }; + + lifecycle.on("stateChange", onStateChange); + + // Trigger start → stop during stream + await lifecycle.start(); + await lifecycle.stop(); + + await stream.sleep(50); + lifecycle.off("stateChange", onStateChange); + }); + }); + + const res = await sseApp.request("/events"); + const text = await res.text(); + const events = parseSSE(text); + const states = events.map((e) => JSON.parse(e.data!).state); + + // Should capture: stopped (initial) → starting → running → stopping → stopped + expect(states).toEqual(["stopped", "starting", "running", "stopping", "stopped"]); + }); +}); diff --git a/src/agent/__tests__/lifecycle.test.ts b/src/agent/__tests__/lifecycle.test.ts new file mode 100644 index 0000000..fc24afc --- /dev/null +++ b/src/agent/__tests__/lifecycle.test.ts @@ -0,0 +1,321 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +vi.mock("../../utils/logger.js", () => ({ + createLogger: vi.fn(() => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + })), +})); + +import { AgentLifecycle, type StateChangeEvent } from "../lifecycle.js"; + +describe("AgentLifecycle", () => { + let lifecycle: AgentLifecycle; + + beforeEach(() => { + lifecycle = new AgentLifecycle(); + }); + + // 1. Initial state is stopped + it("initial state is stopped", () => { + expect(lifecycle.getState()).toBe("stopped"); + }); + + // 2. start() transitions to starting then running + it("start() transitions to starting then running", async () => { + const events: StateChangeEvent[] = []; + lifecycle.on("stateChange", (e: StateChangeEvent) => events.push(e)); + + await lifecycle.start(async () => {}); + + expect(events).toHaveLength(2); + expect(events[0].state).toBe("starting"); + expect(events[1].state).toBe("running"); + expect(lifecycle.getState()).toBe("running"); + }); + + // 3. start() when already running is no-op + it("start() when already running is no-op", async () => { + await lifecycle.start(async () => {}); + expect(lifecycle.getState()).toBe("running"); + + const events: StateChangeEvent[] = []; + lifecycle.on("stateChange", (e: StateChangeEvent) => events.push(e)); + + await lifecycle.start(async () => {}); + expect(events).toHaveLength(0); + }); + + // 4. start() when already starting returns same promise + it("start() when already starting returns same promise", async () => { + let resolveStart!: () => void; + const startFn = () => + new Promise((resolve) => { + resolveStart = resolve; + }); + + const p1 = lifecycle.start(startFn); + const p2 = lifecycle.start(async () => {}); + + resolveStart(); + await p1; + await p2; + + expect(lifecycle.getState()).toBe("running"); + }); + + // 5. start() when stopping throws + it("start() when stopping throws", async () => { + let resolveStop!: () => void; + await lifecycle.start(async () => {}); + + const stopPromise = lifecycle.stop( + () => + new Promise((resolve) => { + resolveStop = resolve; + }) + ); + + await expect(lifecycle.start(async () => {})).rejects.toThrow( + "Cannot start while agent is stopping" + ); + + resolveStop(); + await stopPromise; + }); + + // 6. stop() transitions to stopping then stopped + it("stop() transitions to stopping then stopped", async () => { + await lifecycle.start(async () => {}); + + const events: StateChangeEvent[] = []; + lifecycle.on("stateChange", (e: StateChangeEvent) => events.push(e)); + + await lifecycle.stop(async () => {}); + + expect(events).toHaveLength(2); + expect(events[0].state).toBe("stopping"); + expect(events[1].state).toBe("stopped"); + expect(lifecycle.getState()).toBe("stopped"); + }); + + // 7. stop() when already stopped is no-op + it("stop() when already stopped is no-op", async () => { + const events: StateChangeEvent[] = []; + lifecycle.on("stateChange", (e: StateChangeEvent) => events.push(e)); + + await lifecycle.stop(async () => {}); + expect(events).toHaveLength(0); + }); + + // 8. stop() when already stopping returns same promise + it("stop() when already stopping returns same promise", async () => { + await lifecycle.start(async () => {}); + + let resolveStop!: () => void; + const stopFn = () => + new Promise((resolve) => { + resolveStop = resolve; + }); + + const p1 = lifecycle.stop(stopFn); + const p2 = lifecycle.stop(async () => {}); + + resolveStop(); + await p1; + await p2; + + expect(lifecycle.getState()).toBe("stopped"); + }); + + // 9. stop() when starting waits for start then stops + it("stop() when starting waits for start then stops", async () => { + let resolveStart!: () => void; + const startFn = () => + new Promise((resolve) => { + resolveStart = resolve; + }); + + const startPromise = lifecycle.start(startFn); + + const events: StateChangeEvent[] = []; + lifecycle.on("stateChange", (e: StateChangeEvent) => events.push(e)); + + const stopPromise = lifecycle.stop(async () => {}); + + // Start hasn't resolved yet, lifecycle should still be starting + expect(lifecycle.getState()).toBe("starting"); + + resolveStart(); + await startPromise; + await stopPromise; + + expect(lifecycle.getState()).toBe("stopped"); + // Events should show: running, stopping, stopped (starting was already emitted before listener) + expect(events.map((e) => e.state)).toEqual(["running", "stopping", "stopped"]); + }); + + // 10. Failed start() reverts to stopped with error + it("failed start() reverts to stopped with error", async () => { + const events: StateChangeEvent[] = []; + lifecycle.on("stateChange", (e: StateChangeEvent) => events.push(e)); + + await expect( + lifecycle.start(async () => { + throw new Error("Telegram auth expired"); + }) + ).rejects.toThrow("Telegram auth expired"); + + expect(lifecycle.getState()).toBe("stopped"); + expect(lifecycle.getError()).toBe("Telegram auth expired"); + expect(events).toHaveLength(2); + expect(events[0].state).toBe("starting"); + expect(events[1].state).toBe("stopped"); + expect(events[1].error).toBe("Telegram auth expired"); + }); + + // 11. start() after failed start works and clears error + it("start() after failed start works and clears error", async () => { + await lifecycle + .start(async () => { + throw new Error("fail"); + }) + .catch(() => {}); + + expect(lifecycle.getError()).toBe("fail"); + + await lifecycle.start(async () => {}); + + expect(lifecycle.getState()).toBe("running"); + expect(lifecycle.getError()).toBeUndefined(); + }); + + // 12. stateChange events include correct payload + it("stateChange events include correct payload", async () => { + const events: StateChangeEvent[] = []; + lifecycle.on("stateChange", (e: StateChangeEvent) => events.push(e)); + + await lifecycle.start(async () => {}); + + for (const event of events) { + expect(event).toHaveProperty("state"); + expect(event).toHaveProperty("timestamp"); + expect(typeof event.timestamp).toBe("number"); + expect(event.timestamp).toBeGreaterThan(0); + } + }); + + // 13. Subsystems are started in correct order (mock tracks call order) + it("subsystems are started in correct order", async () => { + const order: string[] = []; + const startFn = async () => { + order.push("plugins"); + order.push("mcp"); + order.push("telegram"); + order.push("modules"); + order.push("debouncer"); + }; + + await lifecycle.start(startFn); + expect(order).toEqual(["plugins", "mcp", "telegram", "modules", "debouncer"]); + }); + + // 14. Subsystems are stopped in reverse order + it("subsystems are stopped in reverse order", async () => { + await lifecycle.start(async () => {}); + + const order: string[] = []; + const stopFn = async () => { + order.push("watcher"); + order.push("mcp"); + order.push("debouncer"); + order.push("handler"); + order.push("modules"); + order.push("bridge"); + }; + + await lifecycle.stop(stopFn); + expect(order).toEqual(["watcher", "mcp", "debouncer", "handler", "modules", "bridge"]); + }); + + // 15. Individual subsystem failure during stop doesn't cascade + it("individual subsystem failure during stop does not cascade", async () => { + await lifecycle.start(async () => {}); + + const completed: string[] = []; + const stopFn = async () => { + completed.push("step1"); + // Simulate a failure in one subsystem + try { + throw new Error("MCP close failed"); + } catch { + // Error handled internally + } + completed.push("step2"); + completed.push("step3"); + }; + + await lifecycle.stop(stopFn); + expect(lifecycle.getState()).toBe("stopped"); + expect(completed).toEqual(["step1", "step2", "step3"]); + }); + + // 16. getUptime() returns seconds when running, null when stopped + it("getUptime() returns seconds when running, null when stopped", async () => { + expect(lifecycle.getUptime()).toBeNull(); + + await lifecycle.start(async () => {}); + + const uptime = lifecycle.getUptime(); + expect(uptime).not.toBeNull(); + expect(typeof uptime).toBe("number"); + expect(uptime).toBeGreaterThanOrEqual(0); + + await lifecycle.stop(async () => {}); + expect(lifecycle.getUptime()).toBeNull(); + }); + + // 17. getError() returns null after successful start + it("getError() returns undefined after successful start", async () => { + // First, fail a start + await lifecycle + .start(async () => { + throw new Error("initial failure"); + }) + .catch(() => {}); + + expect(lifecycle.getError()).toBe("initial failure"); + + // Successful start clears error + await lifecycle.start(async () => {}); + expect(lifecycle.getError()).toBeUndefined(); + }); + + // Extra: registerCallbacks + no-arg start/stop + it("start()/stop() work with registered callbacks", async () => { + const startFn = vi.fn(async () => {}); + const stopFn = vi.fn(async () => {}); + lifecycle.registerCallbacks(startFn, stopFn); + + await lifecycle.start(); + expect(startFn).toHaveBeenCalledOnce(); + expect(lifecycle.getState()).toBe("running"); + + await lifecycle.stop(); + expect(stopFn).toHaveBeenCalledOnce(); + expect(lifecycle.getState()).toBe("stopped"); + }); + + it("start() without callback or registration throws", async () => { + await expect(lifecycle.start()).rejects.toThrow("No start function provided or registered"); + }); + + it("stop() without callback or registration throws when not stopped", async () => { + await lifecycle.start(async () => {}); + // Now try stop() with no registered callback + lifecycle["registeredStopFn"] = null; + await expect(lifecycle.stop()).rejects.toThrow("No stop function provided or registered"); + }); +}); diff --git a/src/agent/lifecycle.ts b/src/agent/lifecycle.ts new file mode 100644 index 0000000..f25dbfb --- /dev/null +++ b/src/agent/lifecycle.ts @@ -0,0 +1,151 @@ +import { EventEmitter } from "node:events"; +import { createLogger } from "../utils/logger.js"; + +const log = createLogger("Lifecycle"); + +export type AgentState = "stopped" | "starting" | "running" | "stopping"; + +export interface StateChangeEvent { + state: AgentState; + error?: string; + timestamp: number; +} + +export class AgentLifecycle extends EventEmitter { + private state: AgentState = "stopped"; + private error: string | undefined; + private startPromise: Promise | null = null; + private stopPromise: Promise | null = null; + private runningSince: number | null = null; + private registeredStartFn: (() => Promise) | null = null; + private registeredStopFn: (() => Promise) | null = null; + + getState(): AgentState { + return this.state; + } + + getError(): string | undefined { + return this.error; + } + + getUptime(): number | null { + if (this.state !== "running" || this.runningSince === null) { + return null; + } + return Math.floor((Date.now() - this.runningSince) / 1000); + } + + /** + * Register the start/stop callbacks so start()/stop() can be called without args. + */ + registerCallbacks(startFn: () => Promise, stopFn: () => Promise): void { + this.registeredStartFn = startFn; + this.registeredStopFn = stopFn; + } + + /** + * Start the agent. Uses the provided callback or falls back to registered one. + * - No-op if already running + * - Returns existing promise if already starting + * - Throws if currently stopping + */ + async start(startFn?: () => Promise): Promise { + const fn = startFn ?? this.registeredStartFn; + if (!fn) { + throw new Error("No start function provided or registered"); + } + + if (this.state === "running") { + return; + } + + if (this.state === "starting") { + return this.startPromise!; + } + + if (this.state === "stopping") { + throw new Error("Cannot start while agent is stopping"); + } + + this.transition("starting"); + + this.startPromise = (async () => { + try { + await fn(); + this.error = undefined; + this.runningSince = Date.now(); + this.transition("running"); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.error = message; + this.runningSince = null; + this.transition("stopped", message); + throw err; + } finally { + this.startPromise = null; + } + })(); + + return this.startPromise; + } + + /** + * Stop the agent. Uses the provided callback or falls back to registered one. + * - No-op if already stopped + * - Returns existing promise if already stopping + * - If starting, waits for start to complete then stops + */ + async stop(stopFn?: () => Promise): Promise { + const fn = stopFn ?? this.registeredStopFn; + if (!fn) { + throw new Error("No stop function provided or registered"); + } + + if (this.state === "stopped") { + return; + } + + if (this.state === "stopping") { + return this.stopPromise!; + } + + // If currently starting, wait for start to finish first + if (this.state === "starting" && this.startPromise) { + try { + await this.startPromise; + } catch { + // Start failed — agent is already stopped + return; + } + } + + this.transition("stopping"); + + this.stopPromise = (async () => { + try { + await fn(); + } catch (err) { + log.error({ err }, "Error during agent stop"); + } finally { + this.runningSince = null; + this.transition("stopped"); + this.stopPromise = null; + } + })(); + + return this.stopPromise; + } + + private transition(newState: AgentState, error?: string): void { + this.state = newState; + const event: StateChangeEvent = { + state: newState, + timestamp: Date.now(), + }; + if (error !== undefined) { + event.error = error; + } + log.info(`Agent state: ${newState}${error ? ` (${error})` : ""}`); + this.emit("stateChange", event); + } +} diff --git a/src/agent/tools/mcp-loader.ts b/src/agent/tools/mcp-loader.ts index 6993709..0d3f43c 100644 --- a/src/agent/tools/mcp-loader.ts +++ b/src/agent/tools/mcp-loader.ts @@ -8,6 +8,7 @@ import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; +import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import { sanitizeForContext } from "../../utils/sanitize.js"; import type { Tool, ToolExecutor, ToolResult, ToolScope } from "./types.js"; import type { ToolRegistry } from "./registry.js"; @@ -95,24 +96,53 @@ export async function loadMcpServers(config: McpConfig): Promise; - await Promise.race([ - client.connect(transport), - new Promise((_, reject) => { - timeoutHandle = setTimeout( - () => reject(new Error(`Connection timed out after ${MCP_CONNECT_TIMEOUT_MS / 1000}s`)), - MCP_CONNECT_TIMEOUT_MS - ); - }), - ]).finally(() => clearTimeout(timeoutHandle)); + try { + await Promise.race([ + client.connect(transport), + new Promise((_, reject) => { + timeoutHandle = setTimeout( + () => + reject(new Error(`Connection timed out after ${MCP_CONNECT_TIMEOUT_MS / 1000}s`)), + MCP_CONNECT_TIMEOUT_MS + ); + }), + ]).finally(() => clearTimeout(timeoutHandle)); + } catch (err) { + // If Streamable HTTP failed on a URL server, retry with SSE + if (serverConfig.url && transport instanceof StreamableHTTPClientTransport) { + await client.close().catch(() => {}); + log.info({ server: name }, "Streamable HTTP failed, falling back to SSE"); + transport = new SSEClientTransport(new URL(serverConfig.url)); + const fallbackClient = new Client({ name: `teleton-${name}`, version: "1.0.0" }); + await Promise.race([ + fallbackClient.connect(transport), + new Promise((_, reject) => { + timeoutHandle = setTimeout( + () => + reject( + new Error(`SSE fallback timed out after ${MCP_CONNECT_TIMEOUT_MS / 1000}s`) + ), + MCP_CONNECT_TIMEOUT_MS + ); + }), + ]).finally(() => clearTimeout(timeoutHandle)); + return { + serverName: name, + client: fallbackClient, + scope: serverConfig.scope ?? "always", + }; + } + throw err; + } return { serverName: name, client, scope: serverConfig.scope ?? "always" }; }) @@ -125,9 +155,11 @@ export async function loadMcpServers(config: McpConfig): Promise this.startAgent(), + () => this.stopAgent() + ); + + // Start WebUI server if enabled (before agent — survives agent stop/restart) + if (this.config.webui.enabled) { + try { + const { WebUIServer } = await import("./webui/server.js"); + // Build MCP server info getter for WebUI (live status, not a snapshot) + const mcpServers = () => + Object.entries(this.config.mcp.servers).map(([name, serverConfig]) => { + const type = serverConfig.command + ? ("stdio" as const) + : serverConfig.url + ? ("streamable-http" as const) + : ("sse" as const); + const target = serverConfig.command ?? serverConfig.url ?? ""; + const connected = this.mcpConnections.some((c) => c.serverName === name); + const moduleName = `mcp_${name}`; + const moduleTools = this.toolRegistry.getModuleTools(moduleName); + return { + name, + type, + target, + scope: serverConfig.scope ?? "always", + enabled: serverConfig.enabled ?? true, + connected, + toolCount: moduleTools.length, + tools: moduleTools.map((t) => t.name), + envKeys: Object.keys(serverConfig.env ?? {}), + }; + }); + + const builtinNames = this.modules.map((m) => m.name); + const pluginContext: PluginContext = { + bridge: this.bridge, + db: getDatabase().getDb(), + config: this.config, + }; + + this.webuiServer = new WebUIServer({ + agent: this.agent, + bridge: this.bridge, + memory: this.memory, + toolRegistry: this.toolRegistry, + plugins: this.modules + .filter((m) => this.toolRegistry.isPluginModule(m.name)) + .map((m) => ({ name: m.name, version: m.version ?? "0.0.0" })), + mcpServers, + config: this.config.webui, + configPath: this.configPath, + lifecycle: this.lifecycle, + marketplace: { + modules: this.modules, + config: this.config, + sdkDeps: this.sdkDeps, + pluginContext, + loadedModuleNames: builtinNames, + rewireHooks: () => this.wirePluginEventHooks(), + }, + }); + await this.webuiServer.start(); + } catch (error) { + log.error({ err: error }, "❌ Failed to start WebUI server"); + log.warn("⚠️ Continuing without WebUI..."); + } + } + + // Start agent subsystems via lifecycle + await this.lifecycle.start(() => this.startAgent()); + + // Keep process alive + await new Promise(() => {}); + } + + /** + * Start agent subsystems (Telegram, plugins, MCP, modules, debouncer, handler). + * Called by lifecycle.start() — do NOT call directly. + */ + private async startAgent(): Promise { // Load modules const moduleNames = this.modules .filter((m) => m.tools(this.config).length > 0) @@ -275,14 +366,15 @@ ${blue} ┌────────────────────── `Cocoon Network unavailable on port ${this.config.cocoon?.port ?? 10000}: ${getErrorMessage(err)}` ); log.error("Start the Cocoon client first: cocoon start"); - process.exit(1); + throw new Error(`Cocoon Network unavailable: ${getErrorMessage(err)}`); } } // Local LLM — register models from OpenAI-compatible server if (this.config.agent.provider === "local" && !this.config.agent.base_url) { - log.error("Local provider requires base_url in config (e.g. http://localhost:11434/v1)"); - process.exit(1); + throw new Error( + "Local provider requires base_url in config (e.g. http://localhost:11434/v1)" + ); } if (this.config.agent.provider === "local" && this.config.agent.base_url) { try { @@ -302,7 +394,7 @@ ${blue} ┌────────────────────── `Local LLM server unavailable at ${this.config.agent.base_url}: ${getErrorMessage(err)}` ); log.error("Start the LLM server first (e.g. ollama serve)"); - process.exit(1); + throw new Error(`Local LLM server unavailable: ${getErrorMessage(err)}`); } } @@ -310,8 +402,7 @@ ${blue} ┌────────────────────── await this.bridge.connect(); if (!this.bridge.isAvailable()) { - log.error("❌ Failed to connect to Telegram"); - process.exit(1); + throw new Error("Failed to connect to Telegram"); } // Resolve owner name/username from Telegram if not already set @@ -385,57 +476,6 @@ ${blue} ┌────────────────────── log.info("Teleton Agent is running! Press Ctrl+C to stop."); - // Start WebUI server if enabled - if (this.config.webui.enabled) { - try { - const { WebUIServer } = await import("./webui/server.js"); - // Build MCP server info for WebUI - const mcpServers = Object.entries(this.config.mcp.servers).map(([name, serverConfig]) => { - const type = serverConfig.command ? ("stdio" as const) : ("sse" as const); - const target = serverConfig.command ?? serverConfig.url ?? ""; - const connected = this.mcpConnections.some((c) => c.serverName === name); - const moduleName = `mcp_${name}`; - const moduleTools = this.toolRegistry.getModuleTools(moduleName); - return { - name, - type, - target, - scope: serverConfig.scope ?? "always", - enabled: serverConfig.enabled ?? true, - connected, - toolCount: moduleTools.length, - tools: moduleTools.map((t) => t.name), - envKeys: Object.keys(serverConfig.env ?? {}), - }; - }); - - this.webuiServer = new WebUIServer({ - agent: this.agent, - bridge: this.bridge, - memory: this.memory, - toolRegistry: this.toolRegistry, - plugins: this.modules - .filter((m) => this.toolRegistry.isPluginModule(m.name)) - .map((m) => ({ name: m.name, version: m.version ?? "0.0.0" })), - mcpServers, - config: this.config.webui, - configPath: this.configPath, - marketplace: { - modules: this.modules, - config: this.config, - sdkDeps: this.sdkDeps, - pluginContext, - loadedModuleNames: builtinNames, - rewireHooks: () => this.wirePluginEventHooks(), - }, - }); - await this.webuiServer.start(); - } catch (error) { - log.error({ err: error }, "❌ Failed to start WebUI server"); - log.warn("⚠️ Continuing without WebUI..."); - } - } - // Initialize message debouncer with bypass logic this.debouncer = new MessageDebouncer( { @@ -474,9 +514,6 @@ ${blue} ┌────────────────────── log.error({ err: error }, "Error enqueueing message"); } }); - - // Keep process alive - await new Promise(() => {}); } /** @@ -859,7 +896,10 @@ ${blue} ┌────────────────────── async stop(): Promise { log.info("👋 Stopping Teleton AI..."); - // Stop WebUI server first (if running) + // Stop agent subsystems via lifecycle + await this.lifecycle.stop(() => this.stopAgent()); + + // Stop WebUI server (if running) if (this.webuiServer) { try { await this.webuiServer.stop(); @@ -868,6 +908,19 @@ ${blue} ┌────────────────────── } } + // Close database last (shared with WebUI) + try { + closeDatabase(); + } catch (e) { + log.error({ err: e }, "⚠️ Database close failed"); + } + } + + /** + * Stop agent subsystems (watcher, MCP, debouncer, handler, modules, bridge). + * Called by lifecycle.stop() — do NOT call directly. + */ + private async stopAgent(): Promise { // Stop plugin watcher first if (this.pluginWatcher) { try { @@ -915,12 +968,6 @@ ${blue} ┌────────────────────── } catch (e) { log.error({ err: e }, "⚠️ Bridge disconnect failed"); } - - try { - closeDatabase(); - } catch (e) { - log.error({ err: e }, "⚠️ Database close failed"); - } } } diff --git a/src/sdk/__tests__/ton.test.ts b/src/sdk/__tests__/ton.test.ts index 3c4c3f8..b1972e0 100644 --- a/src/sdk/__tests__/ton.test.ts +++ b/src/sdk/__tests__/ton.test.ts @@ -963,57 +963,73 @@ describe("createTonSDK", () => { // UTILITY METHODS // ═══════════════════════════════════════════════════════════════ - // Note: toNano/fromNano/validateAddress use require() at runtime, - // which loads the real @ton/ton and @ton/core modules (not mocked). - // We test them against the real implementations. + // These now use top-level ESM imports (mocked by vi.mock). + // We configure the mock return values to match the real behaviour. describe("Utility methods", () => { describe("toNano()", () => { it("converts a number to nanoTON", () => { + mocks.toNano.mockReturnValue(BigInt("1500000000")); const result = sdk.toNano(1.5); + expect(mocks.toNano).toHaveBeenCalledWith("1.5"); expect(result).toBe(BigInt("1500000000")); }); it("converts a string to nanoTON", () => { + mocks.toNano.mockReturnValue(BigInt("2000000000")); const result = sdk.toNano("2"); + expect(mocks.toNano).toHaveBeenCalledWith("2"); expect(result).toBe(BigInt("2000000000")); }); it("converts zero", () => { + mocks.toNano.mockReturnValue(BigInt(0)); expect(sdk.toNano(0)).toBe(BigInt(0)); }); it("throws PluginSDKError on invalid input", () => { + mocks.toNano.mockImplementation(() => { + throw new Error("Invalid number"); + }); expect(() => sdk.toNano("not_a_number")).toThrow(PluginSDKError); }); }); describe("fromNano()", () => { it("converts nanoTON bigint to string", () => { + mocks.fromNano.mockReturnValue("1.5"); const result = sdk.fromNano(BigInt("1500000000")); expect(result).toBe("1.5"); }); it("converts nanoTON string to string", () => { + mocks.fromNano.mockReturnValue("3"); const result = sdk.fromNano("3000000000"); expect(result).toBe("3"); }); it("converts zero", () => { + mocks.fromNano.mockReturnValue("0"); expect(sdk.fromNano(BigInt(0))).toBe("0"); }); }); describe("validateAddress()", () => { it("returns true for a valid TON address", () => { - // Use the real @ton/core Address.parse + mocks.addressParse.mockReturnValue({}); expect(sdk.validateAddress(VALID_ADDRESS)).toBe(true); }); it("returns false for an invalid address", () => { + mocks.addressParse.mockImplementation(() => { + throw new Error("Invalid"); + }); expect(sdk.validateAddress("not-an-address")).toBe(false); }); it("returns false for empty string", () => { + mocks.addressParse.mockImplementation(() => { + throw new Error("Invalid"); + }); expect(sdk.validateAddress("")).toBe(false); }); }); diff --git a/src/sdk/ton.ts b/src/sdk/ton.ts index 6a9fd6a..295f46d 100644 --- a/src/sdk/ton.ts +++ b/src/sdk/ton.ts @@ -25,6 +25,9 @@ import { sendTon } from "../ton/transfer.js"; import { PAYMENT_TOLERANCE_RATIO } from "../constants/limits.js"; import { withBlockchainRetry } from "../utils/retry.js"; import { tonapiFetch } from "../constants/api-endpoints.js"; +import { toNano as tonToNano, fromNano as tonFromNano } from "@ton/ton"; +import { Address as TonAddress } from "@ton/core"; +import { withTxLock } from "../ton/tx-lock.js"; const DEFAULT_MAX_AGE_MINUTES = 10; @@ -246,7 +249,7 @@ export function createTonSDK(log: PluginLogger, db: Database.Database | null): T const { balance, wallet_address, jetton } = item; if (jetton.verification === "blacklist") continue; - const decimals = jetton.decimals || 9; + const decimals = jetton.decimals ?? 9; const rawBalance = BigInt(balance); const divisor = BigInt(10 ** decimals); const wholePart = rawBalance / divisor; @@ -332,95 +335,107 @@ export function createTonSDK(log: PluginLogger, db: Database.Database | null): T throw new PluginSDKError("Invalid recipient address", "INVALID_ADDRESS"); } - // Get sender's jetton wallet from balances - const jettonsResponse = await tonapiFetch(`/accounts/${walletData.address}/jettons`); - if (!jettonsResponse.ok) { - throw new PluginSDKError( - `Failed to fetch jetton balances: ${jettonsResponse.status}`, - "OPERATION_FAILED" + try { + // Get sender's jetton wallet from balances + const jettonsResponse = await tonapiFetch(`/accounts/${walletData.address}/jettons`); + if (!jettonsResponse.ok) { + throw new PluginSDKError( + `Failed to fetch jetton balances: ${jettonsResponse.status}`, + "OPERATION_FAILED" + ); + } + + const jettonsData = await jettonsResponse.json(); + const jettonBalance = jettonsData.balances?.find( + (b: any) => + b.jetton.address.toLowerCase() === jettonAddress.toLowerCase() || + Address.parse(b.jetton.address).toString() === Address.parse(jettonAddress).toString() ); - } - const jettonsData = await jettonsResponse.json(); - const jettonBalance = jettonsData.balances?.find( - (b: any) => - b.jetton.address.toLowerCase() === jettonAddress.toLowerCase() || - Address.parse(b.jetton.address).toString() === Address.parse(jettonAddress).toString() - ); + if (!jettonBalance) { + throw new PluginSDKError( + `You don't own any of this jetton: ${jettonAddress}`, + "OPERATION_FAILED" + ); + } - if (!jettonBalance) { - throw new PluginSDKError( - `You don't own any of this jetton: ${jettonAddress}`, - "OPERATION_FAILED" - ); - } + const senderJettonWallet = jettonBalance.wallet_address.address; + const decimals = jettonBalance.jetton.decimals ?? 9; + const currentBalance = BigInt(jettonBalance.balance); + const amountStr = amount.toFixed(decimals); + const [whole, frac = ""] = amountStr.split("."); + const amountInUnits = BigInt(whole + (frac + "0".repeat(decimals)).slice(0, decimals)); - const senderJettonWallet = jettonBalance.wallet_address.address; - const decimals = jettonBalance.jetton.decimals || 9; - const currentBalance = BigInt(jettonBalance.balance); - const amountStr = amount.toFixed(decimals); - const [whole, frac = ""] = amountStr.split("."); - const amountInUnits = BigInt(whole + (frac + "0".repeat(decimals)).slice(0, decimals)); + if (amountInUnits > currentBalance) { + throw new PluginSDKError( + `Insufficient balance. Have ${Number(currentBalance) / 10 ** decimals}, need ${amount}`, + "OPERATION_FAILED" + ); + } - if (amountInUnits > currentBalance) { - throw new PluginSDKError( - `Insufficient balance. Have ${Number(currentBalance) / 10 ** decimals}, need ${amount}`, - "OPERATION_FAILED" - ); - } + const comment = opts?.comment; - const comment = opts?.comment; + // Build forward payload (comment) + let forwardPayload = beginCell().endCell(); + if (comment) { + forwardPayload = beginCell().storeUint(0, 32).storeStringTail(comment).endCell(); + } - // Build forward payload (comment) - let forwardPayload = beginCell().endCell(); - if (comment) { - forwardPayload = beginCell().storeUint(0, 32).storeStringTail(comment).endCell(); - } + // TEP-74 transfer message body + const JETTON_TRANSFER_OP = 0xf8a7ea5; + const messageBody = beginCell() + .storeUint(JETTON_TRANSFER_OP, 32) + .storeUint(0, 64) // query_id + .storeCoins(amountInUnits) + .storeAddress(Address.parse(to)) + .storeAddress(Address.parse(walletData.address)) // response_destination + .storeBit(false) // no custom_payload + .storeCoins(comment ? toNano("0.01") : BigInt(1)) // forward_ton_amount + .storeBit(comment ? true : false) + .storeMaybeRef(comment ? forwardPayload : null) + .endCell(); + + const keyPair = await getKeyPair(); + if (!keyPair) { + throw new PluginSDKError("Wallet key derivation failed", "OPERATION_FAILED"); + } - // TEP-74 transfer message body - const JETTON_TRANSFER_OP = 0xf8a7ea5; - const messageBody = beginCell() - .storeUint(JETTON_TRANSFER_OP, 32) - .storeUint(0, 64) // query_id - .storeCoins(amountInUnits) - .storeAddress(Address.parse(to)) - .storeAddress(Address.parse(walletData.address)) // response_destination - .storeBit(false) // no custom_payload - .storeCoins(comment ? toNano("0.01") : BigInt(1)) // forward_ton_amount - .storeBit(comment ? true : false) - .storeMaybeRef(comment ? forwardPayload : null) - .endCell(); - - const keyPair = await getKeyPair(); - if (!keyPair) { - throw new PluginSDKError("Wallet key derivation failed", "OPERATION_FAILED"); - } + const seqno = await withTxLock(async () => { + const wallet = WalletContractV5R1.create({ + workchain: 0, + publicKey: keyPair.publicKey, + }); - const wallet = WalletContractV5R1.create({ - workchain: 0, - publicKey: keyPair.publicKey, - }); - - const endpoint = await getCachedHttpEndpoint(); - const client = new TonClient({ endpoint }); - const walletContract = client.open(wallet); - const seqno = await walletContract.getSeqno(); - - await walletContract.sendTransfer({ - seqno, - secretKey: keyPair.secretKey, - sendMode: SendMode.PAY_GAS_SEPARATELY + SendMode.IGNORE_ERRORS, - messages: [ - internal({ - to: Address.parse(senderJettonWallet), - value: toNano("0.05"), - body: messageBody, - bounce: true, - }), - ], - }); - - return { success: true, seqno }; + const endpoint = await getCachedHttpEndpoint(); + const client = new TonClient({ endpoint }); + const walletContract = client.open(wallet); + const seq = await walletContract.getSeqno(); + + await walletContract.sendTransfer({ + seqno: seq, + secretKey: keyPair.secretKey, + sendMode: SendMode.PAY_GAS_SEPARATELY, + messages: [ + internal({ + to: Address.parse(senderJettonWallet), + value: toNano("0.05"), + body: messageBody, + bounce: true, + }), + ], + }); + + return seq; + }); + + return { success: true, seqno }; + } catch (err) { + if (err instanceof PluginSDKError) throw err; + throw new PluginSDKError( + `Failed to send jetton: ${err instanceof Error ? err.message : String(err)}`, + "OPERATION_FAILED" + ); + } }, async getJettonWalletAddress( @@ -498,8 +513,7 @@ export function createTonSDK(log: PluginLogger, db: Database.Database | null): T toNano(amount: number | string): bigint { try { - const { toNano: convert } = require("@ton/ton"); - return convert(String(amount)); + return tonToNano(String(amount)); } catch (err) { throw new PluginSDKError( `toNano conversion failed: ${err instanceof Error ? err.message : String(err)}`, @@ -509,14 +523,12 @@ export function createTonSDK(log: PluginLogger, db: Database.Database | null): T }, fromNano(nano: bigint | string): string { - const { fromNano: convert } = require("@ton/ton"); - return convert(nano); + return tonFromNano(nano); }, validateAddress(address: string): boolean { try { - const { Address } = require("@ton/core"); - Address.parse(address); + TonAddress.parse(address); return true; } catch { return false; diff --git a/src/ton/transfer.ts b/src/ton/transfer.ts index af59ddd..d13e55a 100644 --- a/src/ton/transfer.ts +++ b/src/ton/transfer.ts @@ -3,6 +3,7 @@ import { Address, SendMode } from "@ton/core"; import { getCachedHttpEndpoint } from "./endpoint.js"; import { getKeyPair } from "./wallet-service.js"; import { createLogger } from "../utils/logger.js"; +import { withTxLock } from "./tx-lock.js"; const log = createLogger("TON"); @@ -14,60 +15,62 @@ export interface SendTonParams { } export async function sendTon(params: SendTonParams): Promise { - try { - const { toAddress, amount, comment = "", bounce = false } = params; + return withTxLock(async () => { + try { + const { toAddress, amount, comment = "", bounce = false } = params; - if (!Number.isFinite(amount) || amount <= 0) { - log.error({ amount }, "Invalid transfer amount"); - return null; - } + if (!Number.isFinite(amount) || amount <= 0) { + log.error({ amount }, "Invalid transfer amount"); + return null; + } - let recipientAddress: Address; - try { - recipientAddress = Address.parse(toAddress); - } catch (e) { - log.error({ err: e }, `Invalid recipient address: ${toAddress}`); - return null; - } + let recipientAddress: Address; + try { + recipientAddress = Address.parse(toAddress); + } catch (e) { + log.error({ err: e }, `Invalid recipient address: ${toAddress}`); + return null; + } - const keyPair = await getKeyPair(); - if (!keyPair) { - log.error("Wallet not initialized"); - return null; - } + const keyPair = await getKeyPair(); + if (!keyPair) { + log.error("Wallet not initialized"); + return null; + } - const wallet = WalletContractV5R1.create({ - workchain: 0, - publicKey: keyPair.publicKey, - }); + const wallet = WalletContractV5R1.create({ + workchain: 0, + publicKey: keyPair.publicKey, + }); - const endpoint = await getCachedHttpEndpoint(); - const client = new TonClient({ endpoint }); - const contract = client.open(wallet); + const endpoint = await getCachedHttpEndpoint(); + const client = new TonClient({ endpoint }); + const contract = client.open(wallet); - const seqno = await contract.getSeqno(); + const seqno = await contract.getSeqno(); - await contract.sendTransfer({ - seqno, - secretKey: keyPair.secretKey, - sendMode: SendMode.PAY_GAS_SEPARATELY, - messages: [ - internal({ - to: recipientAddress, - value: toNano(amount), - body: comment, - bounce, - }), - ], - }); + await contract.sendTransfer({ + seqno, + secretKey: keyPair.secretKey, + sendMode: SendMode.PAY_GAS_SEPARATELY, + messages: [ + internal({ + to: recipientAddress, + value: toNano(amount), + body: comment, + bounce, + }), + ], + }); - const pseudoHash = `${seqno}_${Date.now()}_${amount.toFixed(2)}`; + const pseudoHash = `${seqno}_${Date.now()}_${amount.toFixed(2)}`; - log.info(`Sent ${amount} TON to ${toAddress.slice(0, 8)}... - seqno: ${seqno}`); + log.info(`Sent ${amount} TON to ${toAddress.slice(0, 8)}... - seqno: ${seqno}`); - return pseudoHash; - } catch (error) { - log.error({ err: error }, "Error sending TON"); - return null; - } + return pseudoHash; + } catch (error) { + log.error({ err: error }, "Error sending TON"); + return null; + } + }); // withTxLock } diff --git a/src/ton/tx-lock.ts b/src/ton/tx-lock.ts new file mode 100644 index 0000000..d85a75d --- /dev/null +++ b/src/ton/tx-lock.ts @@ -0,0 +1,15 @@ +/** + * Simple async mutex for TON wallet transactions. + * Ensures the seqno read → sendTransfer sequence is atomic, + * preventing two concurrent calls from getting the same seqno. + */ +let pending: Promise = Promise.resolve(); + +export function withTxLock(fn: () => Promise): Promise { + const execute = pending.then(fn, fn); + pending = execute.then( + () => {}, + () => {} + ); + return execute; +} diff --git a/src/utils/gramjs-bigint.ts b/src/utils/gramjs-bigint.ts index ce566e7..8e58ac3 100644 --- a/src/utils/gramjs-bigint.ts +++ b/src/utils/gramjs-bigint.ts @@ -1,9 +1,10 @@ import { randomBytes } from "crypto"; -import bigInt, { type BigInteger } from "big-integer"; +import { type BigInteger } from "big-integer"; +import { returnBigInt } from "telegram/Helpers.js"; /** Convert native bigint or number to BigInteger for GramJS TL long fields */ export function toLong(value: bigint | number): BigInteger { - return bigInt(String(value)); + return returnBigInt(String(value)); } /** Generate cryptographically random BigInteger for randomId / poll ID fields */ diff --git a/src/webui/__tests__/agent-routes.test.ts b/src/webui/__tests__/agent-routes.test.ts new file mode 100644 index 0000000..f6265df --- /dev/null +++ b/src/webui/__tests__/agent-routes.test.ts @@ -0,0 +1,196 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { Hono } from "hono"; + +vi.mock("../../utils/logger.js", () => ({ + createLogger: vi.fn(() => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + })), +})); + +import { AgentLifecycle } from "../../agent/lifecycle.js"; + +// Build a minimal Hono app that mirrors the agent routes from server.ts +function createTestApp(lifecycle?: AgentLifecycle) { + const app = new Hono(); + + // Simulate auth middleware: all requests are authenticated (we test auth separately) + app.post("/api/agent/start", async (c) => { + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + const state = lifecycle.getState(); + if (state === "running") { + return c.json({ state: "running" }, 409); + } + if (state === "stopping") { + return c.json({ error: "Agent is currently stopping, please wait" }, 409); + } + lifecycle.start().catch(() => {}); + return c.json({ state: "starting" }); + }); + + app.post("/api/agent/stop", async (c) => { + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + const state = lifecycle.getState(); + if (state === "stopped") { + return c.json({ state: "stopped" }, 409); + } + if (state === "starting") { + return c.json({ error: "Agent is currently starting, please wait" }, 409); + } + lifecycle.stop().catch(() => {}); + return c.json({ state: "stopping" }); + }); + + app.get("/api/agent/status", (c) => { + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + return c.json({ + state: lifecycle.getState(), + uptime: lifecycle.getUptime(), + error: lifecycle.getError() ?? null, + }); + }); + + return app; +} + +describe("Agent Lifecycle API Routes", () => { + let lifecycle: AgentLifecycle; + let app: ReturnType; + + beforeEach(() => { + lifecycle = new AgentLifecycle(); + lifecycle.registerCallbacks( + async () => {}, + async () => {} + ); + app = createTestApp(lifecycle); + }); + + // 1. POST /api/agent/start — agent stopped + it("POST /api/agent/start returns 200 with starting when agent stopped", async () => { + const res = await app.request("/api/agent/start", { method: "POST" }); + expect(res.status).toBe(200); + const data = await res.json(); + expect(data.state).toBe("starting"); + }); + + // 2. POST /api/agent/start — agent already running + it("POST /api/agent/start returns 409 when agent already running", async () => { + await lifecycle.start(); + expect(lifecycle.getState()).toBe("running"); + + const res = await app.request("/api/agent/start", { method: "POST" }); + expect(res.status).toBe(409); + const data = await res.json(); + expect(data.state).toBe("running"); + }); + + // 3. POST /api/agent/start — agent stopping + it("POST /api/agent/start returns 409 when agent stopping", async () => { + await lifecycle.start(); + let resolveStop!: () => void; + lifecycle.stop( + () => + new Promise((resolve) => { + resolveStop = resolve; + }) + ); + + const res = await app.request("/api/agent/start", { method: "POST" }); + expect(res.status).toBe(409); + const data = await res.json(); + expect(data.error).toContain("stopping"); + + resolveStop(); + }); + + // 4. POST /api/agent/stop — agent running + it("POST /api/agent/stop returns 200 with stopping when agent running", async () => { + await lifecycle.start(); + + const res = await app.request("/api/agent/stop", { method: "POST" }); + expect(res.status).toBe(200); + const data = await res.json(); + expect(data.state).toBe("stopping"); + }); + + // 5. POST /api/agent/stop — agent already stopped + it("POST /api/agent/stop returns 409 when agent already stopped", async () => { + const res = await app.request("/api/agent/stop", { method: "POST" }); + expect(res.status).toBe(409); + const data = await res.json(); + expect(data.state).toBe("stopped"); + }); + + // 6. POST /api/agent/stop — agent starting + it("POST /api/agent/stop returns 409 when agent starting", async () => { + let resolveStart!: () => void; + lifecycle.start( + () => + new Promise((resolve) => { + resolveStart = resolve; + }) + ); + + const res = await app.request("/api/agent/stop", { method: "POST" }); + expect(res.status).toBe(409); + const data = await res.json(); + expect(data.error).toContain("starting"); + + resolveStart(); + }); + + // 7. GET /api/agent/status — returns current state + it("GET /api/agent/status returns current state", async () => { + const res = await app.request("/api/agent/status"); + expect(res.status).toBe(200); + const data = await res.json(); + expect(data.state).toBe("stopped"); + expect(data.uptime).toBeNull(); + expect(data.error).toBeNull(); + }); + + // 8. All endpoints reject unauthenticated requests + // (Auth is handled by WebUIServer middleware, not route-level — skipped here as + // the routes are under /api/* which has auth middleware. Tested via integration.) + + // 9. GET /api/agent/events — SSE content-type + // (Tested in agent-sse.test.ts) + + // 10. POST /api/agent/start — lifecycle not provided + it("returns 503 when lifecycle not provided", async () => { + const noLifecycleApp = createTestApp(undefined); + + const startRes = await noLifecycleApp.request("/api/agent/start", { method: "POST" }); + expect(startRes.status).toBe(503); + + const stopRes = await noLifecycleApp.request("/api/agent/stop", { method: "POST" }); + expect(stopRes.status).toBe(503); + + const statusRes = await noLifecycleApp.request("/api/agent/status"); + expect(statusRes.status).toBe(503); + }); + + // 11. GET /api/agent/status — uptime is number when running, null when stopped + it("status uptime is number when running, null when stopped", async () => { + // Stopped + let res = await app.request("/api/agent/status"); + let data = await res.json(); + expect(data.uptime).toBeNull(); + + // Running + await lifecycle.start(); + res = await app.request("/api/agent/status"); + data = await res.json(); + expect(typeof data.uptime).toBe("number"); + expect(data.uptime).toBeGreaterThanOrEqual(0); + }); +}); diff --git a/src/webui/__tests__/agent-sse.test.ts b/src/webui/__tests__/agent-sse.test.ts new file mode 100644 index 0000000..eacd36e --- /dev/null +++ b/src/webui/__tests__/agent-sse.test.ts @@ -0,0 +1,226 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { Hono } from "hono"; +import { streamSSE } from "hono/streaming"; + +vi.mock("../../utils/logger.js", () => ({ + createLogger: vi.fn(() => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + })), +})); + +import { AgentLifecycle, type StateChangeEvent } from "../../agent/lifecycle.js"; + +/** Parse SSE text into structured events */ +function parseSSE(text: string): Array<{ event?: string; data?: string; id?: string }> { + const events: Array<{ event?: string; data?: string; id?: string }> = []; + const blocks = text.split("\n\n").filter(Boolean); + for (const block of blocks) { + const entry: { event?: string; data?: string; id?: string } = {}; + for (const line of block.split("\n")) { + if (line.startsWith("event:")) entry.event = line.slice(6).trim(); + else if (line.startsWith("data:")) entry.data = line.slice(5).trim(); + else if (line.startsWith("id:")) entry.id = line.slice(3).trim(); + } + if (entry.event || entry.data) events.push(entry); + } + return events; +} + +/** Build a mini Hono app with the SSE endpoint mirroring server.ts */ +function createSSEApp(lifecycle: AgentLifecycle) { + const app = new Hono(); + + app.get("/api/agent/events", (c) => { + return streamSSE(c, async (stream) => { + let aborted = false; + stream.onAbort(() => { + aborted = true; + }); + + const now = Date.now(); + await stream.writeSSE({ + event: "status", + id: String(now), + data: JSON.stringify({ + state: lifecycle.getState(), + error: lifecycle.getError() ?? null, + timestamp: now, + }), + retry: 3000, + }); + + const onStateChange = (event: StateChangeEvent) => { + if (aborted) return; + stream.writeSSE({ + event: "status", + id: String(event.timestamp), + data: JSON.stringify({ + state: event.state, + error: event.error ?? null, + timestamp: event.timestamp, + }), + }); + }; + + lifecycle.on("stateChange", onStateChange); + + // For testing: don't loop forever — just wait briefly for events to propagate + await stream.sleep(50); + + lifecycle.off("stateChange", onStateChange); + }); + }); + + return app; +} + +describe("Agent SSE Endpoint", () => { + let lifecycle: AgentLifecycle; + let app: ReturnType; + + beforeEach(() => { + lifecycle = new AgentLifecycle(); + lifecycle.registerCallbacks( + async () => {}, + async () => {} + ); + app = createSSEApp(lifecycle); + }); + + // 1. Initial connection pushes current state + it("initial connection pushes current state", async () => { + const res = await app.request("/api/agent/events"); + expect(res.status).toBe(200); + expect(res.headers.get("content-type")).toContain("text/event-stream"); + + const text = await res.text(); + const events = parseSSE(text); + expect(events.length).toBeGreaterThanOrEqual(1); + expect(events[0].event).toBe("status"); + const data = JSON.parse(events[0].data!); + expect(data.state).toBe("stopped"); + }); + + // 2. State change emits SSE event + it("state change emits SSE event", async () => { + // Start agent so state is "running" when SSE connects + await lifecycle.start(); + + const sseApp = new Hono(); + sseApp.get("/events", (c) => { + return streamSSE(c, async (stream) => { + let aborted = false; + stream.onAbort(() => { + aborted = true; + }); + + // Push current state + await stream.writeSSE({ + event: "status", + data: JSON.stringify({ state: lifecycle.getState() }), + }); + + // Listen for state change then close + const onStateChange = (event: StateChangeEvent) => { + if (aborted) return; + stream.writeSSE({ + event: "status", + data: JSON.stringify({ state: event.state }), + }); + }; + + lifecycle.on("stateChange", onStateChange); + + // Trigger a stop during the stream + lifecycle.stop().catch(() => {}); + + await stream.sleep(50); + lifecycle.off("stateChange", onStateChange); + }); + }); + + const res = await sseApp.request("/events"); + const text = await res.text(); + const events = parseSSE(text); + + // Should have initial "running" and then "stopping" and "stopped" + const states = events.map((e) => JSON.parse(e.data!).state); + expect(states).toContain("running"); + expect(states).toContain("stopped"); + }); + + // 3. Heartbeat sent after interval (we use short interval for test) + it("heartbeat (ping) is sent", async () => { + const sseApp = new Hono(); + sseApp.get("/events", (c) => { + return streamSSE(c, async (stream) => { + // Send a ping immediately for test purposes + await stream.writeSSE({ event: "ping", data: "" }); + }); + }); + + const res = await sseApp.request("/events"); + const text = await res.text(); + const events = parseSSE(text); + const pings = events.filter((e) => e.event === "ping"); + expect(pings.length).toBeGreaterThanOrEqual(1); + }); + + // 4. Client disconnect removes listener + it("client disconnect removes listener", async () => { + const initialListenerCount = lifecycle.listenerCount("stateChange"); + + // After SSE stream ends, listeners should be cleaned up + const res = await app.request("/api/agent/events"); + await res.text(); // consume stream + + // Listener should have been removed + expect(lifecycle.listenerCount("stateChange")).toBe(initialListenerCount); + }); + + // 5. Multiple concurrent SSE clients + it("multiple concurrent SSE clients receive events independently", async () => { + const res1 = app.request("/api/agent/events"); + const res2 = app.request("/api/agent/events"); + + const [r1, r2] = await Promise.all([res1, res2]); + const text1 = await r1.text(); + const text2 = await r2.text(); + + // Both should have received the initial status event + const events1 = parseSSE(text1); + const events2 = parseSSE(text2); + expect(events1.length).toBeGreaterThanOrEqual(1); + expect(events2.length).toBeGreaterThanOrEqual(1); + expect(events1[0].event).toBe("status"); + expect(events2[0].event).toBe("status"); + }); + + // 6. Error in stream handler doesn't crash server + it("error in stream handler does not crash server", async () => { + const errorApp = new Hono(); + errorApp.get("/events", (c) => { + return streamSSE(c, async (stream) => { + await stream.writeSSE({ event: "status", data: '{"state":"stopped"}' }); + // Simulate error — stream closes but server stays up + throw new Error("simulated stream error"); + }); + }); + + // Should not throw + const res = await errorApp.request("/events"); + expect(res.status).toBe(200); + // Stream still returned something before the error + const text = await res.text(); + expect(text).toContain("status"); + }); + + // Extra: SSE content-type header + it("returns text/event-stream content type", async () => { + const res = await app.request("/api/agent/events"); + expect(res.headers.get("content-type")).toContain("text/event-stream"); + }); +}); diff --git a/src/webui/routes/mcp.ts b/src/webui/routes/mcp.ts index 32884da..0d83bea 100644 --- a/src/webui/routes/mcp.ts +++ b/src/webui/routes/mcp.ts @@ -12,9 +12,10 @@ export function createMcpRoutes(deps: WebUIServerDeps) { // List all MCP servers with their connection status and tools app.get("/", (c) => { + const servers = typeof deps.mcpServers === "function" ? deps.mcpServers() : deps.mcpServers; const response: APIResponse = { success: true, - data: deps.mcpServers, + data: servers, }; return c.json(response); }); diff --git a/src/webui/routes/plugins.ts b/src/webui/routes/plugins.ts index 6915620..fdcc602 100644 --- a/src/webui/routes/plugins.ts +++ b/src/webui/routes/plugins.ts @@ -4,13 +4,15 @@ import type { WebUIServerDeps, APIResponse, LoadedPlugin } from "../types.js"; export function createPluginsRoutes(deps: WebUIServerDeps) { const app = new Hono(); - // List all loaded plugins + // List all loaded plugins — computed dynamically so plugins loaded after + // WebUI startup (via startAgent) are always reflected in the response. app.get("/", (c) => { - const response: APIResponse = { - success: true, - data: deps.plugins, - }; - return c.json(response); + const data = deps.marketplace + ? deps.marketplace.modules + .filter((m) => deps.toolRegistry.isPluginModule(m.name)) + .map((m) => ({ name: m.name, version: m.version ?? "0.0.0" })) + : deps.plugins; + return c.json>({ success: true, data }); }); return app; diff --git a/src/webui/server.ts b/src/webui/server.ts index f76e84a..2b36185 100644 --- a/src/webui/server.ts +++ b/src/webui/server.ts @@ -1,12 +1,14 @@ import { Hono } from "hono"; import { serve } from "@hono/node-server"; import { cors } from "hono/cors"; +import { streamSSE } from "hono/streaming"; import { bodyLimit } from "hono/body-limit"; import { setCookie, getCookie, deleteCookie } from "hono/cookie"; import { existsSync, readFileSync } from "node:fs"; import { join, dirname, resolve, relative } from "node:path"; import { fileURLToPath } from "node:url"; import type { WebUIServerDeps } from "./types.js"; +import type { StateChangeEvent } from "../agent/lifecycle.js"; import { createLogger } from "../utils/logger.js"; const log = createLogger("WebUI"); @@ -205,6 +207,113 @@ export class WebUIServer { this.app.route("/api/config", createConfigRoutes(this.deps)); this.app.route("/api/marketplace", createMarketplaceRoutes(this.deps)); + // Agent lifecycle routes + this.app.post("/api/agent/start", async (c) => { + const lifecycle = this.deps.lifecycle; + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + const state = lifecycle.getState(); + if (state === "running") { + return c.json({ state: "running" }, 409); + } + if (state === "stopping") { + return c.json({ error: "Agent is currently stopping, please wait" }, 409); + } + // Fire-and-forget: start is async, we return immediately + lifecycle.start().catch((err: Error) => { + log.error({ err }, "Agent start failed"); + }); + return c.json({ state: "starting" }); + }); + + this.app.post("/api/agent/stop", async (c) => { + const lifecycle = this.deps.lifecycle; + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + const state = lifecycle.getState(); + if (state === "stopped") { + return c.json({ state: "stopped" }, 409); + } + if (state === "starting") { + return c.json({ error: "Agent is currently starting, please wait" }, 409); + } + // Fire-and-forget: stop is async, we return immediately + lifecycle.stop().catch((err: Error) => { + log.error({ err }, "Agent stop failed"); + }); + return c.json({ state: "stopping" }); + }); + + this.app.get("/api/agent/status", (c) => { + const lifecycle = this.deps.lifecycle; + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + return c.json({ + state: lifecycle.getState(), + uptime: lifecycle.getUptime(), + error: lifecycle.getError() ?? null, + }); + }); + + this.app.get("/api/agent/events", (c) => { + const lifecycle = this.deps.lifecycle; + if (!lifecycle) { + return c.json({ error: "Agent lifecycle not available" }, 503); + } + + return streamSSE(c, async (stream) => { + let aborted = false; + + stream.onAbort(() => { + aborted = true; + }); + + // Push current state immediately on connection + const now = Date.now(); + await stream.writeSSE({ + event: "status", + id: String(now), + data: JSON.stringify({ + state: lifecycle.getState(), + error: lifecycle.getError() ?? null, + timestamp: now, + }), + retry: 3000, + }); + + // Listen for state changes + const onStateChange = (event: StateChangeEvent) => { + if (aborted) return; + stream.writeSSE({ + event: "status", + id: String(event.timestamp), + data: JSON.stringify({ + state: event.state, + error: event.error ?? null, + timestamp: event.timestamp, + }), + }); + }; + + lifecycle.on("stateChange", onStateChange); + + // Heartbeat loop + keep connection alive + while (!aborted) { + await stream.sleep(30_000); + if (aborted) break; + await stream.writeSSE({ + event: "ping", + data: "", + }); + } + + lifecycle.off("stateChange", onStateChange); + }); + }); + // Serve static files in production (if built) const webDist = findWebDist(); if (webDist) { diff --git a/src/webui/types.ts b/src/webui/types.ts index d7d50e0..e5e1354 100644 --- a/src/webui/types.ts +++ b/src/webui/types.ts @@ -6,6 +6,7 @@ import type { WebUIConfig, Config } from "../config/schema.js"; import type { Database } from "better-sqlite3"; import type { PluginModule, PluginContext } from "../agent/tools/types.js"; import type { SDKDependencies } from "../sdk/index.js"; +import type { AgentLifecycle } from "../agent/lifecycle.js"; export interface LoadedPlugin { name: string; @@ -14,7 +15,7 @@ export interface LoadedPlugin { export interface McpServerInfo { name: string; - type: "stdio" | "sse"; + type: "stdio" | "sse" | "streamable-http"; target: string; scope: string; enabled: boolean; @@ -34,9 +35,10 @@ export interface WebUIServerDeps { }; toolRegistry: ToolRegistry; plugins: LoadedPlugin[]; - mcpServers: McpServerInfo[]; + mcpServers: McpServerInfo[] | (() => McpServerInfo[]); config: WebUIConfig; configPath: string; + lifecycle?: AgentLifecycle; marketplace?: MarketplaceDeps; } diff --git a/web/src/components/AgentControl.tsx b/web/src/components/AgentControl.tsx new file mode 100644 index 0000000..ec49bce --- /dev/null +++ b/web/src/components/AgentControl.tsx @@ -0,0 +1,224 @@ +import { useState, useRef, useCallback } from 'react'; +import { createPortal } from 'react-dom'; +import { useAgentStatus, AgentState } from '../hooks/useAgentStatus'; + +const API_BASE = '/api'; +const MAX_START_RETRIES = 3; +const RETRY_DELAYS = [1000, 2000, 4000]; + +function jitter(ms: number): number { + return ms + ms * 0.3 * Math.random(); +} + +const STATE_CONFIG: Record = { + stopped: { dot: 'var(--text-tertiary)', label: 'Stopped', pulse: false }, + starting: { dot: '#FFD60A', label: 'Starting...', pulse: true }, + running: { dot: 'var(--green)', label: 'Running', pulse: true }, + stopping: { dot: '#FF9F0A', label: 'Stopping...', pulse: true }, + error: { dot: 'var(--red)', label: 'Error', pulse: false }, +}; + +export function AgentControl() { + const { state, error } = useAgentStatus(); + const [inflight, setInflight] = useState(false); + const [showConfirm, setShowConfirm] = useState(false); + const [actionError, setActionError] = useState(null); + const [retrying, setRetrying] = useState(false); + const retryTimerRef = useRef | null>(null); + const abortRef = useRef(null); + + const displayState = error && state === 'stopped' ? 'error' : state; + const config = STATE_CONFIG[displayState]; + + const clearRetry = useCallback(() => { + if (retryTimerRef.current) { + clearTimeout(retryTimerRef.current); + retryTimerRef.current = null; + } + setRetrying(false); + }, []); + + const doStart = useCallback(async (attempt = 0): Promise => { + setInflight(true); + setActionError(null); + abortRef.current = new AbortController(); + + try { + const res = await fetch(`${API_BASE}/agent/start`, { + method: 'POST', + credentials: 'include', + signal: AbortSignal.timeout(10_000), + }); + const json = await res.json(); + + if (!res.ok && json.error) { + throw new Error(json.error); + } + } catch (err) { + if (!retryTimerRef.current && attempt < MAX_START_RETRIES) { + setRetrying(true); + const delay = jitter(RETRY_DELAYS[attempt] ?? 4000); + retryTimerRef.current = setTimeout(() => { + retryTimerRef.current = null; + doStart(attempt + 1); + }, delay); + setActionError(err instanceof Error ? err.message : String(err)); + setInflight(false); + return; + } + setActionError(err instanceof Error ? err.message : String(err)); + setRetrying(false); + } finally { + setInflight(false); + abortRef.current = null; + } + }, []); + + const doStop = useCallback(async () => { + setShowConfirm(false); + setInflight(true); + setActionError(null); + clearRetry(); + + try { + const res = await fetch(`${API_BASE}/agent/stop`, { + method: 'POST', + credentials: 'include', + signal: AbortSignal.timeout(10_000), + }); + const json = await res.json(); + + if (!res.ok && json.error) { + throw new Error(json.error); + } + } catch (err) { + setActionError(err instanceof Error ? err.message : String(err)); + } finally { + setInflight(false); + } + }, [clearRetry]); + + const handleStart = () => { + clearRetry(); + doStart(0); + }; + + const handleStopClick = () => { + setShowConfirm(true); + }; + + const showPlay = displayState === 'stopped' || displayState === 'error'; + const showStop = displayState === 'running'; + + return ( +
+ {/* Status badge */} +
+ + + {config.label} + +
+ + {/* Action button */} + {showPlay && !retrying && ( + + )} + + {showStop && ( + + )} + + {/* Retry indicator */} + {retrying && ( +
+ Retrying... +
+ )} + + {/* Error message */} + {actionError && !retrying && ( +
+ {actionError} +
+ )} + + {/* Stop confirmation dialog — portal to body so it's above the sidebar */} + {showConfirm && createPortal( +
setShowConfirm(false)}> +
e.stopPropagation()} style={{ maxWidth: '360px' }}> +

Stop Agent?

+

+ Active Telegram sessions will be interrupted. +

+
+ + +
+
+
, + document.body + )} + + {/* Pulse animation */} + +
+ ); +} diff --git a/web/src/components/Layout.tsx b/web/src/components/Layout.tsx index 110f4be..82a6f6a 100644 --- a/web/src/components/Layout.tsx +++ b/web/src/components/Layout.tsx @@ -1,5 +1,6 @@ import { Link, useLocation } from 'react-router-dom'; import { Shell } from './Shell'; +import { AgentControl } from './AgentControl'; import { logout } from '../lib/api'; function DashboardNav() { @@ -25,13 +26,16 @@ function DashboardNav() { MCP Config -
- +
+ +
+ +
); diff --git a/web/src/components/setup/ConfigStep.tsx b/web/src/components/setup/ConfigStep.tsx index bf8b3c8..9347dfa 100644 --- a/web/src/components/setup/ConfigStep.tsx +++ b/web/src/components/setup/ConfigStep.tsx @@ -163,7 +163,7 @@ export function ConfigStep({ data, onChange }: StepProps) { {/* ── Optional Integrations ── */}

- Optional Integrations + Optional API Keys

@@ -172,7 +172,7 @@ export function ConfigStep({ data, onChange }: StepProps) {
Bot Token - (optional) + (recommended)

Welcome to Teleton Setup

- Configure your autonomous Telegram agent in a few steps: + Configure your autonomous Telegram agent in a few steps.

+
+ Security Notice +
+ This software is an autonomous AI agent that can: +
    +
  • Send and receive Telegram messages on your behalf
  • +
  • Execute cryptocurrency transactions using your wallet
  • +
  • Access and store conversation data
  • +
  • Make decisions and take actions autonomously
  • +
+ You are solely responsible for all actions taken by this agent. + By proceeding, you acknowledge that you understand these risks + and accept full responsibility for the agent's behavior. +

+ Never share your API keys, wallet mnemonics, or session files. +
+
+
-
-
-
1. Connect an LLM provider (Anthropic, OpenAI, etc.)
-
2. Link your Telegram account
-
3. Set up a TON wallet for crypto operations
-
4. Configure behavior and optional modules
-
-
- -
- IMPORTANT SECURITY NOTICE -

- This software is an autonomous AI agent that can: -
    -
  • Send and receive Telegram messages on your behalf
  • -
  • Execute cryptocurrency transactions using your wallet
  • -
  • Access and store conversation data
  • -
  • Make decisions and take actions autonomously
  • -
- You are solely responsible for all actions taken by this agent. - By proceeding, you acknowledge that you understand these risks - and accept full responsibility for the agent's behavior. -

- Never share your API keys, wallet mnemonics, or session files. -
- {status?.configExists && (
Existing configuration detected. It will be overwritten when setup completes. diff --git a/web/src/hooks/useAgentStatus.ts b/web/src/hooks/useAgentStatus.ts new file mode 100644 index 0000000..faf8670 --- /dev/null +++ b/web/src/hooks/useAgentStatus.ts @@ -0,0 +1,147 @@ +import { useEffect, useRef, useState } from 'react'; + +export type AgentState = 'stopped' | 'starting' | 'running' | 'stopping'; + +interface AgentStatusEvent { + state: AgentState; + error: string | null; + timestamp: number; +} + +const SSE_URL = '/api/agent/events'; +const POLL_URL = '/api/agent/status'; +const MAX_RETRIES = 5; +const MAX_BACKOFF_MS = 30_000; +const POLL_INTERVAL_MS = 3_000; + +function backoffMs(attempt: number): number { + const base = Math.min(1000 * 2 ** attempt, MAX_BACKOFF_MS); + const jitter = base * 0.3 * Math.random(); + return base + jitter; +} + +export function useAgentStatus(): { state: AgentState; error: string | null } { + const [state, setState] = useState('stopped'); + const [error, setError] = useState(null); + + const mountedRef = useRef(true); + const esRef = useRef(null); + const retryCountRef = useRef(0); + const retryTimerRef = useRef | null>(null); + const pollTimerRef = useRef | null>(null); + const sseFailedRef = useRef(false); + + useEffect(() => { + mountedRef.current = true; + + function handleStatusEvent(ev: MessageEvent) { + if (!mountedRef.current) return; + try { + const data: AgentStatusEvent = JSON.parse(ev.data); + setState(data.state); + setError(data.error ?? null); + retryCountRef.current = 0; // reset on successful message + } catch { + // ignore parse errors + } + } + + function closeSSE() { + if (esRef.current) { + esRef.current.removeEventListener('status', handleStatusEvent as EventListener); + esRef.current.close(); + esRef.current = null; + } + } + + function stopPolling() { + if (pollTimerRef.current) { + clearInterval(pollTimerRef.current); + pollTimerRef.current = null; + } + } + + function startPolling() { + if (pollTimerRef.current) return; + const poll = async () => { + if (!mountedRef.current) return; + try { + const res = await fetch(POLL_URL, { credentials: 'include' }); + if (!res.ok) return; + const json = await res.json(); + const data = json.data ?? json; + if (mountedRef.current) { + setState(data.state); + setError(data.error ?? null); + } + } catch { + // ignore fetch errors during polling + } + }; + poll(); // immediate first poll + pollTimerRef.current = setInterval(poll, POLL_INTERVAL_MS); + } + + function connect() { + if (!mountedRef.current) return; + closeSSE(); + + const es = new EventSource(SSE_URL, { withCredentials: true }); + esRef.current = es; + + es.addEventListener('status', handleStatusEvent as EventListener); + + es.addEventListener('open', () => { + retryCountRef.current = 0; + sseFailedRef.current = false; + stopPolling(); + }); + + es.onerror = () => { + closeSSE(); + if (!mountedRef.current) return; + + retryCountRef.current += 1; + if (retryCountRef.current <= MAX_RETRIES) { + const delay = backoffMs(retryCountRef.current - 1); + retryTimerRef.current = setTimeout(connect, delay); + } else { + // SSE exhausted — fall back to polling + sseFailedRef.current = true; + startPolling(); + } + }; + } + + function handleVisibility() { + if (document.hidden) { + closeSSE(); + stopPolling(); + if (retryTimerRef.current) { + clearTimeout(retryTimerRef.current); + retryTimerRef.current = null; + } + } else { + retryCountRef.current = 0; + sseFailedRef.current = false; + connect(); + } + } + + connect(); + document.addEventListener('visibilitychange', handleVisibility); + + return () => { + mountedRef.current = false; + closeSSE(); + stopPolling(); + if (retryTimerRef.current) { + clearTimeout(retryTimerRef.current); + retryTimerRef.current = null; + } + document.removeEventListener('visibilitychange', handleVisibility); + }; + }, []); + + return { state, error }; +} diff --git a/web/src/index.css b/web/src/index.css index df6afe4..754276c 100644 --- a/web/src/index.css +++ b/web/src/index.css @@ -365,13 +365,13 @@ select { } .custom-select-option.active { - background: var(--accent-dim); - color: var(--accent); + background: var(--surface-hover); + color: var(--text); } .custom-select-option.active.focused { - background: var(--accent-dim); - color: var(--accent); + background: var(--surface-active); + color: var(--text); } input:focus, textarea:focus, select:focus { @@ -478,8 +478,8 @@ a:hover { } .tab.active .tab-count { - background: var(--accent-dim); - color: var(--accent); + background: var(--surface-active); + color: var(--text); } /* ---- Button variants ---- */ @@ -497,13 +497,13 @@ button.btn-ghost:hover { } button.btn-danger { - background: var(--red-dim); - color: var(--red); - border: 1px solid rgba(255, 69, 58, 0.15); + background: var(--surface); + color: var(--text); + border: 1px solid var(--glass-border); } button.btn-danger:hover { - background: rgba(255, 69, 58, 0.2); + background: var(--surface-hover); opacity: 1; } @@ -536,9 +536,9 @@ button.btn-sm { } .tag-pill.active { - background: var(--accent-dim); - color: var(--accent); - border-color: rgba(10, 132, 255, 0.3); + background: var(--surface-hover); + color: var(--text); + border-color: var(--glass-border-strong); } /* ---- Form ---- */ @@ -565,15 +565,15 @@ button.btn-sm { } .alert.success { - background: var(--green-dim); - color: var(--green); - border: 1px solid rgba(48, 209, 88, 0.2); + background: rgba(255, 255, 255, 0.04); + color: var(--text-secondary); + border: 1px solid rgba(255, 255, 255, 0.08); } .alert.error { - background: var(--red-dim); - color: var(--red); - border: 1px solid rgba(255, 69, 58, 0.2); + background: rgba(255, 255, 255, 0.04); + color: var(--text-secondary); + border: 1px solid rgba(255, 255, 255, 0.08); } /* ---- File rows (table rows in Workspace, Tasks) ---- */ @@ -1003,9 +1003,9 @@ button.btn-sm { } .step-dot.active { - background: var(--accent); - color: var(--text-on-accent); - border-color: var(--accent); + background: var(--surface-active); + color: var(--text); + border-color: var(--glass-border-strong); } .step-dot.completed { @@ -1079,8 +1079,8 @@ button.btn-sm { } .provider-card.selected { - border-color: var(--accent); - background: var(--accent-dim); + border-color: rgba(255, 255, 255, 0.3); + background: rgba(255, 255, 255, 0.08); } .provider-card h3 { @@ -1207,9 +1207,13 @@ button.btn-sm { margin-bottom: 16px; font-size: 13px; line-height: 1.6; - background: var(--red-dim); - color: var(--red); - border: 1px solid rgba(255, 69, 58, 0.2); + background: rgba(255, 255, 255, 0.04); + color: var(--text-secondary); + border: 1px solid rgba(255, 255, 255, 0.08); +} + +.warning-card strong { + color: var(--text); } /* ---- Info Panel ---- */ diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index b5299ea..2be1417 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -185,7 +185,7 @@ export interface ToolRagStatus { export interface McpServerInfo { name: string; - type: 'stdio' | 'sse'; + type: 'stdio' | 'sse' | 'streamable-http'; target: string; scope: string; enabled: boolean; @@ -391,10 +391,10 @@ export const api = { }); }, - async workspaceList(path = '', recursive = false) { + async workspaceList(_path = '', _recursive = false) { const params = new URLSearchParams(); - if (path) params.set('path', path); - if (recursive) params.set('recursive', 'true'); + if (_path) params.set('path', _path); + if (_recursive) params.set('recursive', 'true'); const qs = params.toString(); return fetchAPI>(`/workspace${qs ? `?${qs}` : ''}`); }, @@ -435,8 +435,8 @@ export const api = { return fetchAPI>('/workspace/info'); }, - async tasksList(status?: string) { - const qs = status ? `?status=${status}` : ''; + async tasksList(_status?: string) { + const qs = _status ? `?status=${_status}` : ''; return fetchAPI>(`/tasks${qs}`); }, @@ -444,12 +444,12 @@ export const api = { return fetchAPI>(`/tasks/${id}`); }, - async tasksDelete(id: string) { - return fetchAPI>(`/tasks/${id}`, { method: 'DELETE' }); + async tasksDelete(_id: string) { + return fetchAPI>(`/tasks/${_id}`, { method: 'DELETE' }); }, - async tasksCancel(id: string) { - return fetchAPI>(`/tasks/${id}/cancel`, { method: 'POST' }); + async tasksCancel(_id: string) { + return fetchAPI>(`/tasks/${_id}/cancel`, { method: 'POST' }); }, async tasksCleanDone() { @@ -473,8 +473,8 @@ export const api = { }); }, - async getMarketplace(refresh = false) { - const qs = refresh ? '?refresh=true' : ''; + async getMarketplace(_refresh = false) { + const qs = _refresh ? '?refresh=true' : ''; return fetchAPI>(`/marketplace${qs}`); }, @@ -517,9 +517,7 @@ export const api = { }, connectLogs(onLog: (entry: LogEntry) => void, onError?: (error: Event) => void) { - // No token needed — HttpOnly cookie is sent automatically by the browser const url = `${API_BASE}/logs/stream`; - const eventSource = new EventSource(url); eventSource.addEventListener('log', (event) => { @@ -548,8 +546,8 @@ export const setup = { getProviders: () => fetchSetupAPI('/setup/providers'), - getModels: (provider: string) => - fetchSetupAPI(`/setup/models/${encodeURIComponent(provider)}`), + getModels: (_provider: string) => + fetchSetupAPI(`/setup/models/${encodeURIComponent(_provider)}`), validateApiKey: (provider: string, apiKey: string) => fetchSetupAPI<{ valid: boolean; error?: string }>('/setup/validate/api-key', {