Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
711 changes: 711 additions & 0 deletions docs/superpowers/plans/2026-04-10-platform-abstraction-layer.md

Large diffs are not rendered by default.

164 changes: 164 additions & 0 deletions packages/agent/src/adapters/web.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import type { Request, Response } from 'express'
import type { ResponseChunk } from '../core/types.js'
import { AgentCore } from '../core/agent-core.js'

// ─────────────────────────────────────────────────────────────────────────────
// Web Adapter — maps Express HTTP requests to AgentCore
//
// createWebAdapter() returns handlers for:
// POST /api/command → handleCommand (single-turn, JSON response)
// POST /api/chat → handleChat (multi-turn, JSON response)
// POST /api/chat/stream → handleChatStream (multi-turn, SSE stream)
// ─────────────────────────────────────────────────────────────────────────────

const MAX_MESSAGE_LENGTH = 4000

interface ChatMessage {
role: 'user' | 'assistant' | 'system'
content: string
}

/**
* Map a ResponseChunk to the SSE event format the frontend expects.
*/
function chunkToSSE(chunk: ResponseChunk): Record<string, unknown> {
switch (chunk.type) {
case 'text':
return { type: 'content_block_delta', text: chunk.text }
case 'tool_start':
return { type: 'tool_use', name: chunk.toolName, id: chunk.toolId }
case 'tool_end':
return {
type: 'tool_result',
name: chunk.toolName,
id: chunk.toolId,
success: chunk.success,
}
case 'error':
return { type: 'error', message: chunk.text }
case 'done':
return { type: 'message_complete', content: chunk.text }
}
}

/**
* Extract the wallet address set by JWT middleware.
*/
function getWallet(req: Request): string {
return (req as unknown as Record<string, unknown>).wallet as string
}

/**
* Validate the messages array from a chat request body.
* Returns the last user message content or null if invalid.
*/
function extractLastUserMessage(
messages: unknown,
): string | null {
if (!Array.isArray(messages) || messages.length === 0) return null
const userMessages = (messages as ChatMessage[]).filter(
(m) => m.role === 'user',
)
if (userMessages.length === 0) return null
return userMessages[userMessages.length - 1].content
}

export function createWebAdapter() {
const core = new AgentCore()

// ───────────────────────────────────────────────────────────────────────
// POST /api/command — single-turn command execution
// ───────────────────────────────────────────────────────────────────────

async function handleCommand(req: Request, res: Response) {
const wallet = getWallet(req)
const { message } = req.body as { message?: string }

if (!message || typeof message !== 'string') {
res.status(400).json({ error: 'message is required' })
return
}

if (message.length > MAX_MESSAGE_LENGTH) {
res.status(400).json({ error: 'message exceeds 4000 character limit' })
return
}

const response = await core.processMessage({
platform: 'web',
userId: wallet,
message,
})

res.json({ status: 'ok', wallet, response })
}

// ───────────────────────────────────────────────────────────────────────
// POST /api/chat — multi-turn chat (JSON response)
// ───────────────────────────────────────────────────────────────────────

async function handleChat(req: Request, res: Response) {
const wallet = getWallet(req)
const lastUserMsg = extractLastUserMessage(req.body?.messages)

if (!lastUserMsg) {
res.status(400).json({
error: 'messages array is required and must not be empty',
})
return
}

const response = await core.processMessage({
platform: 'web',
userId: wallet,
message: lastUserMsg,
})

res.json(response)
}

// ───────────────────────────────────────────────────────────────────────
// POST /api/chat/stream — multi-turn chat (SSE stream)
// ───────────────────────────────────────────────────────────────────────

async function handleChatStream(req: Request, res: Response) {
const wallet = getWallet(req)
const lastUserMsg = extractLastUserMessage(req.body?.messages)

if (!lastUserMsg) {
res.status(400).json({
error: 'messages array is required and must not be empty',
})
return
}

// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
res.flushHeaders()

// Track client disconnect
let aborted = false
res.on('close', () => {
aborted = true
})

try {
const ctx = { platform: 'web' as const, userId: wallet, message: lastUserMsg }
for await (const chunk of core.streamMessage(ctx)) {
if (aborted || res.writableEnded) break
res.write(`data: ${JSON.stringify(chunkToSSE(chunk))}\n\n`)
}
} catch (err) {
const message = err instanceof Error ? err.message : 'Internal error'
res.write(`data: ${JSON.stringify({ type: 'error', message })}\n\n`)
}

res.write('data: [DONE]\n\n')
res.end()
}

return { handleCommand, handleChat, handleChatStream, core }
}
112 changes: 112 additions & 0 deletions packages/agent/src/core/agent-core.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import type { MsgContext, ResponseChunk, AgentResponse } from './types.js'
import { chat, chatStream } from '../agent.js'
import {
resolveSession,
getConversation,
appendConversation,
} from '../session.js'

// ─────────────────────────────────────────────────────────────────────────────
// AgentCore — platform-agnostic message processing
//
// Wraps the LLM chat/stream functions with session management and
// conversation persistence. Any platform adapter (web, Telegram, X)
// constructs a MsgContext and hands it to AgentCore.
// ─────────────────────────────────────────────────────────────────────────────

export class AgentCore {
/**
* Process a message synchronously (non-streaming).
*
* Resolves the user's session, loads conversation history, calls the LLM,
* extracts text + tool usage, persists the conversation turn, and returns.
*/
async processMessage(ctx: MsgContext): Promise<AgentResponse> {
const session = resolveSession(ctx.userId)
const history = getConversation(session.id)

// Build messages: existing history + the new user message
const messages = [
...history.map((m) => ({ role: m.role as 'user' | 'assistant', content: m.content as string })),
{ role: 'user' as const, content: ctx.message },
]

const response = await chat(messages)

// Extract text from text blocks
const textBlocks = response.content.filter(
(b: { type: string }) => b.type === 'text'
) as { type: 'text'; text: string }[]
const text = textBlocks.map((b) => b.text).join('')

// Extract tool names from tool_use blocks
const toolUseBlocks = response.content.filter(
(b: { type: string }) => b.type === 'tool_use'
) as { type: 'tool_use'; name: string }[]
const toolsUsed = toolUseBlocks.map((b) => b.name)

// Persist the conversation turn
appendConversation(session.id, [
{ role: 'user', content: ctx.message },
{ role: 'assistant', content: text },
])

return { text, toolsUsed }
}

/**
* Process a message with streaming.
*
* Same session/history resolution, but yields ResponseChunk objects as
* SSE events arrive from the LLM. Persists the conversation after the
* stream completes, then yields a final 'done' chunk.
*/
async *streamMessage(ctx: MsgContext): AsyncGenerator<ResponseChunk> {
const session = resolveSession(ctx.userId)
const history = getConversation(session.id)

const messages = [
...history.map((m) => ({ role: m.role as 'user' | 'assistant', content: m.content as string })),
{ role: 'user' as const, content: ctx.message },
]

let fullText = ''

for await (const event of chatStream(messages)) {
switch (event.type) {
case 'content_block_delta':
yield { type: 'text', text: event.text }
break

case 'tool_use':
yield { type: 'tool_start', toolName: event.name, toolId: event.id }
break

case 'tool_result':
yield {
type: 'tool_end',
toolName: event.name,
toolId: event.id,
success: event.success,
}
break

case 'error':
yield { type: 'error', text: event.message }
break

case 'message_complete':
fullText = event.content
break
}
}

// Persist the completed conversation turn
appendConversation(session.id, [
{ role: 'user', content: ctx.message },
{ role: 'assistant', content: fullText },
])

yield { type: 'done', text: fullText }
}
}
7 changes: 7 additions & 0 deletions packages/agent/src/core/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export type {
Platform,
MsgContext,
ResponseChunk,
AgentResponse,
} from './types.js'
export { AgentCore } from './agent-core.js'
32 changes: 32 additions & 0 deletions packages/agent/src/core/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/** Platform a message originated from */
export type Platform = 'web' | 'telegram' | 'x'

/**
* Unified inbound message context — platform-agnostic.
* Every adapter constructs this from its native format.
*/
export interface MsgContext {
/** Platform the message came from */
platform: Platform
/** User identifier — wallet address (web), telegram user ID, X user ID */
userId: string
/** The user's message text */
message: string
/** Optional metadata from the platform (thread ID, reply-to, etc.) */
metadata?: Record<string, unknown>
}

/** A single response chunk for streaming */
export interface ResponseChunk {
type: 'text' | 'tool_start' | 'tool_end' | 'error' | 'done'
text?: string
toolName?: string
toolId?: string
success?: boolean
}

/** Full (non-streaming) agent response */
export interface AgentResponse {
text: string
toolsUsed: string[]
}
Loading
Loading