Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ hyp smoke walkthrough_picker_to_first_query
hyp smoke client_attach_idempotent
hyp smoke gateway_claude_capture
hyp smoke gateway_codex_capture
hyp smoke hypignore_capture_drop
hyp smoke otel_loopback_capture
hyp smoke local_parquet_export
hyp smoke status_diagnostics
Expand Down
39 changes: 36 additions & 3 deletions hypaware-core/plugins-workspace/claude/src/backfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import {
import { pickLatestMatching, readSessionContext } from './session_context.js'
import { deriveRepoFromCwd } from './git_repo.js'
import { anthropicMessageAttributes } from './anthropic.js'
import { createUsagePolicyResolver } from '../../../../src/core/usage-policy/index.js'

/**
* @import { AiGatewayProjectedExchange, AiGatewayProjectedMessage, BackfillContribution, BackfillItem, BackfillProvenance, BackfillRunContext, JsonObject, PluginLogger } from '../../../../collectivus-plugin-kernel-types.js'
* @import { SessionContextRecord, TranscriptEntry } from './types.js'
* @import { UsagePolicyResolver } from '../../../../src/core/usage-policy/types.js'
*/

/**
Expand Down Expand Up @@ -66,6 +68,7 @@ const DAY_MS = 24 * 60 * 60 * 1000
* clientName?: string,
* pluginName?: string,
* deriveRepo?: (cwd: string | undefined) => Promise<{ git_remote?: string, repo_root?: string }>,
* resolver?: UsagePolicyResolver,
* }} opts
* @returns {BackfillContribution}
*/
Expand All @@ -78,14 +81,18 @@ export function createClaudeBackfillProvider(opts) {
// recover it by running git in the session's cwd at backfill time. Injectable
// so tests stub the git lookup and stay hermetic.
const deriveRepo = opts.deriveRepo ?? deriveRepoFromCwd
// One resolver per backfill run (LLP 0050): the per-cwd cache reflects disk at
// run time and is shared across the whole scan. Injectable for hermetic tests.
// @ref LLP 0050 [implements]: skip ignored sessions at the capture seam.
const resolver = opts.resolver ?? createUsagePolicyResolver()

return {
name: clientName,
plugin: pluginName,
datasets: [AI_GATEWAY_MESSAGES_DATASET],
summary: 'Import local Claude Code transcripts into ai_gateway_messages',
async *run(ctx) {
yield* runClaudeBackfill({ ctx, projectsDir, stateFile, clientName, deriveRepo })
yield* runClaudeBackfill({ ctx, projectsDir, stateFile, clientName, deriveRepo, resolver })
},
}
}
Expand All @@ -103,11 +110,12 @@ export function createClaudeBackfillProvider(opts) {
* stateFile: string,
* clientName: string,
* deriveRepo: (cwd: string | undefined) => Promise<{ git_remote?: string, repo_root?: string }>,
* resolver: UsagePolicyResolver,
* }} args
* @returns {AsyncGenerator<BackfillItem>}
*/
async function* runClaudeBackfill(args) {
const { ctx, projectsDir, stateFile, clientName, deriveRepo } = args
const { ctx, projectsDir, stateFile, clientName, deriveRepo, resolver } = args
const log = ctx.log
const window = resolveWindow(ctx)
// Many sessions share a cwd (the same repo, often the same checkout), and
Expand Down Expand Up @@ -166,11 +174,36 @@ async function* runClaudeBackfill(args) {

for (const [sessionId, sessionEntries] of groupBySession(entries)) {
const windowed = filterByWindow(sessionEntries, window)
const record = pickLatestMatching(sessionRecords, { sessionId, transcriptPath: filePath })

// @ref LLP 0050 [implements]: capture-seam drop for backfill. Skip an
// ignored session BEFORE projecting/writing it, else `hyp backfill` would
// silently re-import the exact sessions ignored live (LLP 0049#requirements
// R1). The cwd precedence mirrors projectedExchangeFromEntries (the
// hook-written record wins, else the first transcript line's cwd), so the
// session is tested on the same cwd the row would have carried.
const sessionCwd = record?.cwd ?? windowed.find((entry) => entry.cwd)?.cwd
const sessionPolicy = sessionCwd ? resolver.resolve(sessionCwd) : null
if (sessionPolicy?.class === 'ignore') {
// A fail-safe clamp (declared token unimplemented) escalates to warn
// so an operator can tell it from an intended ignore (R3 SHOULD).
log[sessionPolicy.warn ? 'warn' : 'info']('claude.backfill.usage_policy_drop', {
component: 'plugin.claude.backfill',
operation: 'usage_policy_drop',
session_id: sessionId,
declared: sessionPolicy.declared,
governed_by: sessionPolicy.governedBy,
status: 'ok',
...(sessionPolicy.warn ? { warn: sessionPolicy.warn } : {}),
})
continue
}

const exchange = await projectedExchangeFromEntries({
sessionId,
entries: windowed,
clientName,
record: pickLatestMatching(sessionRecords, { sessionId, transcriptPath: filePath }),
record,
agentMeta,
deriveRepo: deriveRepoCached,
})
Expand Down
34 changes: 34 additions & 0 deletions hypaware-core/plugins-workspace/claude/src/projector.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ import {
pickLatestMatching,
readSessionContext,
} from './session_context.js'
import { createUsagePolicyResolver } from '../../../../src/core/usage-policy/index.js'

/**
* @import { AiGatewayExchangeInput, AiGatewayExchangeProjector, AiGatewayProjectedExchange, AiGatewayProjectedMessage, AiGatewayUpstreamPreset, JsonObject } from '../../../../collectivus-plugin-kernel-types.js'
* @import { TranscriptEntry } from './types.js'
* @import { UsagePolicyResolver } from '../../../../src/core/usage-policy/types.js'
*/

/**
Expand Down Expand Up @@ -80,6 +82,7 @@ import {
* projectsDir?: string,
* clientName?: string,
* logger?: { warn(message: string, fields?: Record<string, unknown>): void, debug?: (m: string, f?: Record<string, unknown>) => void },
* resolver?: UsagePolicyResolver,
* }} opts
* @returns {AiGatewayExchangeProjector}
*/
Expand All @@ -89,6 +92,11 @@ export function createClaudeExchangeProjector(opts) {
const clientName = opts.clientName ?? 'claude'
const logger = opts.logger
const sessionContextCache = createSessionContextCache()
// One resolver per projector (per daemon run): the per-cwd cache rides the
// projector's lifetime so the capture hot path adds no unbounded fs work.
// @ref LLP 0050 [implements]: the .hypignore capture-seam drop lives in the
// client adapter, the only place that resolves a cwd; injectable for tests.
const resolver = opts.resolver ?? createUsagePolicyResolver()

return {
name: 'claude-anthropic-messages',
Expand Down Expand Up @@ -152,6 +160,32 @@ export function createClaudeExchangeProjector(opts) {
sessionId,
})
: undefined

// @ref LLP 0050 [implements]: capture-seam drop. Once the exchange's cwd
// is resolved, an ancestor `.hypignore` that resolves to `ignore` means
// this exchange is never recorded: return no rows BEFORE building any, so
// the gateway source's write guard (`if (messageRows.length > 0)`)
// persists nothing. The response has already streamed to the client, so
// the live LLM call is untouched (LLP 0049#requirements R2). Returning
// `undefined` is this projector's "no rows" signal (the dispatcher treats
// it, and an empty `messages`, identically): a literal `[]` is not a valid
// projection and would log a spurious invalid-output warning.
const cwd = sessionContextRecord?.cwd
const policy = cwd ? resolver.resolve(cwd) : null
if (policy?.class === 'ignore') {
// A fail-safe clamp (declared token unimplemented) escalates to warn
// so an operator can tell it from an intended ignore (R3 SHOULD).
ctx.log[policy.warn ? 'warn' : 'info']('plugin.claude.usage_policy_drop', {
component: 'claude',
operation: 'usage_policy_drop',
exchange_id: input.exchange_id,
declared: policy.declared,
governed_by: policy.governedBy,
...(policy.warn ? { warn: policy.warn } : {}),
})
return undefined
}

const transcriptEntries = sessionId
? await loadTranscriptSafe({
projectsDir,
Expand Down
36 changes: 34 additions & 2 deletions hypaware-core/plugins-workspace/codex/src/backfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import fs from 'node:fs/promises'
import path from 'node:path'

import { createUsagePolicyResolver } from '../../../../src/core/usage-policy/index.js'
import { redactRemoteUserinfo } from './git-remote.js'

/**
* @import { AiGatewayProjectedExchange, AiGatewayProjectedMessage, BackfillContribution, BackfillEvent, BackfillItem, BackfillProvenance, BackfillRunContext, JsonObject, JsonValue, PluginLogger } from '../../../../collectivus-plugin-kernel-types.js'
* @import { CodexRolloutItem, CodexRolloutSession } from './types.js'
* @import { UsagePolicyResolver } from '../../../../src/core/usage-policy/types.js'
*/

/**
Expand Down Expand Up @@ -74,6 +76,7 @@ const DAY_MS = 24 * 60 * 60 * 1000
* unsupportedLocations?: Array<{ kind: string, path: string }>,
* clientName?: string,
* pluginName?: string,
* resolver?: UsagePolicyResolver,
* }} opts
* @returns {BackfillContribution}
*/
Expand All @@ -83,14 +86,17 @@ export function createCodexBackfillProvider(opts) {
const codexHome = opts.codexHome ?? defaultCodexHome(opts.homeDir)
const sessionsDir = opts.sessionsDir ?? path.join(codexHome, 'sessions')
const unsupportedLocations = opts.unsupportedLocations ?? defaultUnsupportedLocations(opts.homeDir)
// One `.hypignore` resolver per backfill run, holding its per-cwd cache for
// the whole scan (LLP 0049 R6).
const resolver = opts.resolver ?? createUsagePolicyResolver()

return {
name: clientName,
plugin: pluginName,
datasets: [AI_GATEWAY_MESSAGES_DATASET],
summary: 'Import local Codex session rollouts into ai_gateway_messages',
async *run(ctx) {
yield* runCodexBackfill({ ctx, codexHome, sessionsDir, unsupportedLocations, clientName })
yield* runCodexBackfill({ ctx, codexHome, sessionsDir, unsupportedLocations, clientName, resolver })
},
}
}
Expand Down Expand Up @@ -131,11 +137,12 @@ function defaultUnsupportedLocations(homeDir) {
* sessionsDir: string,
* unsupportedLocations: Array<{ kind: string, path: string }>,
* clientName: string,
* resolver: UsagePolicyResolver,
* }} args
* @returns {AsyncGenerator<BackfillItem | BackfillEvent>}
*/
async function* runCodexBackfill(args) {
const { ctx, codexHome, sessionsDir, unsupportedLocations, clientName } = args
const { ctx, codexHome, sessionsDir, unsupportedLocations, clientName, resolver } = args
const log = ctx.log
const window = resolveWindow(ctx)

Expand All @@ -158,6 +165,7 @@ async function* runCodexBackfill(args) {

let filesSeen = 0
let sessionsProjected = 0
let sessionsIgnored = 0
let messagesProjected = 0

for (const filePath of await listRolloutFiles(sessionsDir)) {
Expand All @@ -180,6 +188,29 @@ async function* runCodexBackfill(args) {
}

for (const session of sessions) {
// @ref LLP 0050 [implements]: capture-seam drop for backfill, symmetric
// to the @hypaware/claude backfill skip. A session whose recorded cwd has
// an ancestor `.hypignore` of class `ignore` is skipped before projecting
// or yielding any row, so `hyp backfill` never re-imports the exact
// sessions ignored live (LLP 0049 R1).
const sessionPolicy = session.cwd ? resolver.resolve(session.cwd) : null
if (sessionPolicy?.class === 'ignore') {
sessionsIgnored += 1
// A fail-safe clamp (declared token unimplemented) escalates to warn
// so an operator can tell it from an intended ignore (R3 SHOULD).
log[sessionPolicy.warn ? 'warn' : 'info']('codex.backfill.usage_policy_drop', {
component: COMPONENT,
operation: 'usage_policy_drop',
conversation_id: session.sessionId,
class: 'ignore',
declared: sessionPolicy.declared,
governed_by: sessionPolicy.governedBy,
status: 'skipped',
...(sessionPolicy.warn ? { warn: sessionPolicy.warn } : {}),
})
continue
}

const exchange = projectedExchangeFromSession({
session,
items: filterByWindow(session.items, window),
Expand Down Expand Up @@ -211,6 +242,7 @@ async function* runCodexBackfill(args) {
operation: 'backfill.scan',
files_seen: filesSeen,
sessions_projected: sessionsProjected,
sessions_ignored: sessionsIgnored,
messages_projected: messagesProjected,
status: 'ok',
})
Expand Down
41 changes: 39 additions & 2 deletions hypaware-core/plugins-workspace/codex/src/exchange-projector.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import { createHash } from 'node:crypto'

import { createUsagePolicyResolver } from '../../../../src/core/usage-policy/index.js'
import { redactRemoteUserinfo } from './git-remote.js'

/**
* @import { AiGatewayExchangeInput, AiGatewayExchangeProjector, AiGatewayProjectedExchange, AiGatewayProjectedMessage, JsonObject } from '../../../../collectivus-plugin-kernel-types.js'
* @import { CodexLogReader } from './types.js'
* @import { UsagePolicyResolver } from '../../../../src/core/usage-policy/types.js'
*/

/**
Expand All @@ -27,6 +29,7 @@ import { redactRemoteUserinfo } from './git-remote.js'
* @param {{
* logReaders?: CodexLogReader[],
* env?: Record<string, string | undefined>,
* resolver?: UsagePolicyResolver,
* }} [opts]
* @returns {AiGatewayExchangeProjector}
*/
Expand All @@ -36,6 +39,10 @@ export function createCodexExchangeProjector(opts = {}) {
const logReaders = sqliteReadsEnabled && Array.isArray(opts.logReaders)
? opts.logReaders
: []
// One `.hypignore` resolver per projector instance (one per started
// listener): the per-cwd cache then spans the listener's lifetime so the
// capture hot path adds no unbounded fs work (LLP 0049 R6).
const resolver = opts.resolver ?? createUsagePolicyResolver()

return {
name: 'codex-exchange',
Expand All @@ -54,14 +61,44 @@ export function createCodexExchangeProjector(opts = {}) {
return false
},

/** @param {AiGatewayExchangeInput} input */
project(input) {
/**
* @param {AiGatewayExchangeInput} input
* @param {{ log?: { info?: (m: string, f?: Record<string, unknown>) => void } }} [ctx]
*/
project(input, ctx) {
const reqBody = parseMaybeJson(input.request_body)
if (!isPlainObject(reqBody)) return undefined

const path = input.path ?? ''
const provider = resolveProvider(input, reqBody, path)
const codexContext = resolveCodexContext(input, provider, path, reqBody)

// @ref LLP 0050 [implements]: capture-seam drop, symmetric to the
// @hypaware/claude projector. Once this exchange's cwd is resolved, an
// ancestor `.hypignore` of class `ignore` drops the row by returning no
// projection (the gateway source's `messageRows.length > 0` write guard
// then persists nothing). The response has already streamed, so the live
// call is untouched: only persistence is suppressed (LLP 0049 R1/R2).
// This is the same cwd `resolveRecordedContext` would stamp on the row.
const cwd = firstString(codexContext?.cwd, readRecordedCwd(reqBody))
if (cwd) {
const policy = resolver.resolve(cwd)
if (policy.class === 'ignore') {
// `declared` distinguishes an intended `ignore` from a fail-safe clamp
// of an unimplemented token; on a clamp escalate to warn (R3 SHOULD).
ctx?.log?.[policy.warn ? 'warn' : 'info']?.('plugin.codex.usage_policy_drop', {
component: 'codex',
operation: 'usage_policy_drop',
class: policy.class,
declared: policy.declared,
governed_by: policy.governedBy,
cwd_sha256: sha256Hex(cwd).slice(0, 16),
...(policy.warn ? { warn: policy.warn } : {}),
})
return undefined
}
}

const responseBody = parseMaybeJson(input.response_body)
const streamEvents = Array.isArray(input.stream_events) ? input.stream_events : []
const messages = messagesForTransport({ provider, path, reqBody, responseBody, streamEvents })
Expand Down
Loading