Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
645 changes: 645 additions & 0 deletions docs/superpowers/plans/2026-04-10-herald-x-agent.md

Large diffs are not rendered by default.

172 changes: 172 additions & 0 deletions packages/agent/src/adapters/x.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import type { Tool } from '@mariozechner/pi-ai'
import type Anthropic from '@anthropic-ai/sdk'
import type { GuardianEvent } from '../coordination/event-bus.js'
import { AgentCore } from '../core/agent-core.js'
import { HERALD_SYSTEM_PROMPT, HERALD_TOOLS, HERALD_TOOL_EXECUTORS } from '../herald/herald.js'
import { getBudgetStatus } from '../herald/budget.js'
import { guardianBus } from '../coordination/event-bus.js'

// ─────────────────────────────────────────────────────────────────────────────
// X Adapter — subscribes to herald:mention / herald:dm events on guardianBus
// and routes them through AgentCore for LLM-powered responses.
// ─────────────────────────────────────────────────────────────────────────────

/**
* Convert Pi SDK Tool[] to Anthropic Tool[] format.
* Pi SDK uses `parameters`, Anthropic expects `input_schema`.
*/
export function toAnthropicTools(piTools: Tool[]): Anthropic.Tool[] {
return piTools.map((tool) => ({
name: tool.name,
description: tool.description,
input_schema: tool.parameters as unknown as Anthropic.Tool['input_schema'],
}))
}

/**
* Build a tool executor that dispatches to HERALD_TOOL_EXECUTORS by name.
* Throws if the tool is not registered.
*/
function heraldToolExecutor(name: string, input: Record<string, unknown>): Promise<unknown> {
const executor = HERALD_TOOL_EXECUTORS[name]
if (!executor) {
throw new Error(`Unknown HERALD tool: ${name}`)
}
return executor(input)
}

/**
* Create the X adapter — HERALD's LLM brain for mentions and DMs.
*
* Subscribes to `herald:mention` and `herald:dm` events on the guardianBus,
* processes them through AgentCore, and auto-replies when the LLM doesn't
* already use `replyTweet` or `sendDM` tools.
*/
export function createXAdapter() {
const core = new AgentCore({
systemPrompt: HERALD_SYSTEM_PROMPT,
tools: toAnthropicTools(HERALD_TOOLS),
toolExecutor: heraldToolExecutor,
model: process.env.HERALD_MODEL ?? 'anthropic/claude-sonnet-4-6',
})

// ───────────────────────────────────────────────────────────────────────
// herald:mention handler
// ───────────────────────────────────────────────────────────────────────

async function handleMention(event: GuardianEvent): Promise<void> {
const { mentionId, authorId, text, intent } = event.data as {
mentionId: string
authorId: string | null
text: string
intent: string
}

if (intent === 'spam') return

const { gate } = getBudgetStatus()
if (gate === 'paused' || gate === 'dm-only') return

const response = await core.processMessage({
platform: 'x',
userId: authorId ?? 'unknown',
message: text,
metadata: { mentionId, intent },
})

// Auto-reply if the LLM produced text but didn't already call replyTweet
if (response.text && !response.toolsUsed.includes('replyTweet')) {
await heraldToolExecutor('replyTweet', {
tweet_id: mentionId,
text: response.text.slice(0, 280),
})
}

guardianBus.emit({
source: 'herald',
type: 'herald:reply-sent',
level: 'routine',
data: { mentionId, toolsUsed: response.toolsUsed },
timestamp: new Date().toISOString(),
})
}

// ───────────────────────────────────────────────────────────────────────
// herald:dm handler
// ───────────────────────────────────────────────────────────────────────

async function handleDM(event: GuardianEvent): Promise<void> {
const { dmId, senderId, text, intent } = event.data as {
dmId: string
senderId: string | null
text: string
intent: string
}

if (intent === 'spam') return

const { gate } = getBudgetStatus()
if (gate === 'paused') return

const response = await core.processMessage({
platform: 'x',
userId: senderId ?? 'unknown',
message: text,
metadata: { dmId, intent, isDM: true },
})

// Auto-reply if the LLM produced text but didn't already call sendDM
if (response.text && !response.toolsUsed.includes('sendDM')) {
await heraldToolExecutor('sendDM', {
user_id: senderId,
text: response.text,
})
}

guardianBus.emit({
source: 'herald',
type: 'herald:dm-replied',
level: 'routine',
data: { dmId, toolsUsed: response.toolsUsed },
timestamp: new Date().toISOString(),
})
}

// ───────────────────────────────────────────────────────────────────────
// Wrapped handlers — try/catch to never crash the event bus
// ───────────────────────────────────────────────────────────────────────

guardianBus.on('herald:mention', (event: GuardianEvent) => {
handleMention(event).catch((err) => {
const message = err instanceof Error ? err.message : String(err)
guardianBus.emit({
source: 'herald',
type: 'herald:reply-failed',
level: 'important',
data: {
mentionId: (event.data as Record<string, unknown>).mentionId,
error: message,
},
timestamp: new Date().toISOString(),
})
})
})

guardianBus.on('herald:dm', (event: GuardianEvent) => {
handleDM(event).catch((err) => {
const message = err instanceof Error ? err.message : String(err)
guardianBus.emit({
source: 'herald',
type: 'herald:dm-reply-failed',
level: 'important',
data: {
dmId: (event.data as Record<string, unknown>).dmId,
error: message,
},
timestamp: new Date().toISOString(),
})
})
})

return { core, handleMention, handleDM }
}
22 changes: 16 additions & 6 deletions packages/agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ export interface AgentOptions {
model?: string
maxTokens?: number
apiKey?: string
/** System prompt override (defaults to SIPHER's SYSTEM_PROMPT) */
systemPrompt?: string
/** Tool definitions override (defaults to SIPHER's TOOLS) */
tools?: Anthropic.Tool[]
/** Custom tool executor (defaults to SIPHER's executeTool) */
toolExecutor?: (name: string, input: Record<string, unknown>) => Promise<unknown>
}

// ─── SSE event types emitted by chatStream ──────────────────────────────────
Expand Down Expand Up @@ -222,8 +228,8 @@ export async function chat(
const response = await client.messages.create({
model,
max_tokens: maxTokens,
system: SYSTEM_PROMPT,
tools: TOOLS,
system: options.systemPrompt ?? SYSTEM_PROMPT,
tools: options.tools ?? TOOLS,
messages: conversationMessages,
})

Expand All @@ -244,9 +250,11 @@ export async function chat(
// Execute tools and collect results
const toolResults: Anthropic.ToolResultBlockParam[] = []

const execute = options.toolExecutor ?? executeTool

for (const block of toolUseBlocks) {
try {
const result = await executeTool(block.name, block.input)
const result = await execute(block.name, block.input)
toolResults.push({
type: 'tool_result',
tool_use_id: block.id,
Expand Down Expand Up @@ -295,8 +303,8 @@ export async function* chatStream(
const stream = client.messages.stream({
model,
max_tokens: maxTokens,
system: SYSTEM_PROMPT,
tools: TOOLS,
system: options.systemPrompt ?? SYSTEM_PROMPT,
tools: options.tools ?? TOOLS,
messages: conversationMessages,
})

Expand Down Expand Up @@ -329,11 +337,13 @@ export async function* chatStream(

const toolResults: Anthropic.ToolResultBlockParam[] = []

const execute = options.toolExecutor ?? executeTool

for (const block of toolUseBlocks) {
yield { type: 'tool_use', name: block.name, id: block.id }

try {
const result = await executeTool(block.name, block.input)
const result = await execute(block.name, block.input)
toolResults.push({
type: 'tool_result',
tool_use_id: block.id,
Expand Down
22 changes: 19 additions & 3 deletions packages/agent/src/core/agent-core.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { MsgContext, ResponseChunk, AgentResponse } from './types.js'
import type { MsgContext, ResponseChunk, AgentResponse, AgentConfig } from './types.js'
import { chat, chatStream } from '../agent.js'
import {
resolveSession,
Expand All @@ -15,6 +15,12 @@ import {
// ─────────────────────────────────────────────────────────────────────────────

export class AgentCore {
private config: AgentConfig

constructor(config: AgentConfig = {}) {
this.config = config
}

/**
* Process a message synchronously (non-streaming).
*
Expand All @@ -31,7 +37,12 @@ export class AgentCore {
{ role: 'user' as const, content: ctx.message },
]

const response = await chat(messages)
const response = await chat(messages, {
systemPrompt: this.config.systemPrompt,
tools: this.config.tools,
toolExecutor: this.config.toolExecutor,
model: this.config.model,
})

// Extract text from text blocks
const textBlocks = response.content.filter(
Expand Down Expand Up @@ -72,7 +83,12 @@ export class AgentCore {

let fullText = ''

for await (const event of chatStream(messages)) {
for await (const event of chatStream(messages, {
systemPrompt: this.config.systemPrompt,
tools: this.config.tools,
toolExecutor: this.config.toolExecutor,
model: this.config.model,
})) {
switch (event.type) {
case 'content_block_delta':
yield { type: 'text', text: event.text }
Expand Down
1 change: 1 addition & 0 deletions packages/agent/src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ export type {
MsgContext,
ResponseChunk,
AgentResponse,
AgentConfig,
} from './types.js'
export { AgentCore } from './agent-core.js'
10 changes: 10 additions & 0 deletions packages/agent/src/core/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
import type Anthropic from '@anthropic-ai/sdk'

/** Configuration for an agent identity (tools, prompt, model) */
export interface AgentConfig {
systemPrompt?: string
tools?: Anthropic.Tool[]
toolExecutor?: (name: string, input: Record<string, unknown>) => Promise<unknown>
model?: string
}

/** Platform a message originated from */
export type Platform = 'web' | 'telegram' | 'x'

Expand Down
14 changes: 11 additions & 3 deletions packages/agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,22 @@ const server = app.listen(PORT, () => {
console.log(` Squad: http://localhost:${PORT}/api/squad`)
console.log(` Herald: http://localhost:${PORT}/api/herald`)

// Start HERALD poller only when X API credentials are present
// Start HERALD (X agent) only when X API credentials are present
if (process.env.X_BEARER_TOKEN && process.env.X_CONSUMER_KEY) {
import('./herald/poller.js').then(({ createPollerState, startPoller }) => {
Promise.all([
import('./herald/poller.js'),
import('./adapters/x.js'),
]).then(([{ createPollerState, startPoller }, { createXAdapter }]) => {
// Start X adapter first (subscribes to events before poller emits them)
createXAdapter()
console.log(' HERALD: X adapter started (LLM brain for mentions + DMs)')

// Then start poller (emits events the adapter handles)
const heraldState = createPollerState()
startPoller(heraldState)
console.log(' HERALD: poller started (mentions + DMs + scheduled posts)')
}).catch(err => {
console.warn(' HERALD: poller not started:', (err as Error).message)
console.warn(' HERALD: not started:', (err as Error).message)
})
}
})
Expand Down
Loading
Loading