Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions hypaware-core/plugins-workspace/codex/src/backfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import fs from 'node:fs/promises'
import path from 'node:path'

import { createUsagePolicyResolver } from '../../../../src/core/usage-policy/index.js'
import { redactRemoteUserinfo } from './git-remote.js'

/**
* @import { AiGatewayProjectedExchange, AiGatewayProjectedMessage, BackfillContribution, BackfillEvent, BackfillItem, BackfillProvenance, BackfillRunContext, JsonObject, JsonValue, PluginLogger } from '../../../../collectivus-plugin-kernel-types.js'
* @import { CodexRolloutItem, CodexRolloutSession } from './types.js'
* @import { UsagePolicyResolver } from '../../../../src/core/usage-policy/types.js'
*/

/**
Expand Down Expand Up @@ -74,6 +76,7 @@ const DAY_MS = 24 * 60 * 60 * 1000
* unsupportedLocations?: Array<{ kind: string, path: string }>,
* clientName?: string,
* pluginName?: string,
* resolver?: UsagePolicyResolver,
* }} opts
* @returns {BackfillContribution}
*/
Expand All @@ -83,14 +86,17 @@ export function createCodexBackfillProvider(opts) {
const codexHome = opts.codexHome ?? defaultCodexHome(opts.homeDir)
const sessionsDir = opts.sessionsDir ?? path.join(codexHome, 'sessions')
const unsupportedLocations = opts.unsupportedLocations ?? defaultUnsupportedLocations(opts.homeDir)
// One `.hypignore` resolver per backfill run, holding its per-cwd cache for
// the whole scan (LLP 0049 R6).
const resolver = opts.resolver ?? createUsagePolicyResolver()

return {
name: clientName,
plugin: pluginName,
datasets: [AI_GATEWAY_MESSAGES_DATASET],
summary: 'Import local Codex session rollouts into ai_gateway_messages',
async *run(ctx) {
yield* runCodexBackfill({ ctx, codexHome, sessionsDir, unsupportedLocations, clientName })
yield* runCodexBackfill({ ctx, codexHome, sessionsDir, unsupportedLocations, clientName, resolver })
},
}
}
Expand Down Expand Up @@ -131,11 +137,12 @@ function defaultUnsupportedLocations(homeDir) {
* sessionsDir: string,
* unsupportedLocations: Array<{ kind: string, path: string }>,
* clientName: string,
* resolver: UsagePolicyResolver,
* }} args
* @returns {AsyncGenerator<BackfillItem | BackfillEvent>}
*/
async function* runCodexBackfill(args) {
const { ctx, codexHome, sessionsDir, unsupportedLocations, clientName } = args
const { ctx, codexHome, sessionsDir, unsupportedLocations, clientName, resolver } = args
const log = ctx.log
const window = resolveWindow(ctx)

Expand All @@ -158,6 +165,7 @@ async function* runCodexBackfill(args) {

let filesSeen = 0
let sessionsProjected = 0
let sessionsIgnored = 0
let messagesProjected = 0

for (const filePath of await listRolloutFiles(sessionsDir)) {
Expand All @@ -180,6 +188,24 @@ async function* runCodexBackfill(args) {
}

for (const session of sessions) {
// @ref LLP 0050 [implements]: capture-seam drop for backfill, symmetric
// to the @hypaware/claude backfill skip. A session whose recorded cwd has
// an ancestor `.hypignore` of class `ignore` is skipped before projecting
// or yielding any row, so `hyp backfill` never re-imports the exact
// sessions ignored live (LLP 0049 R1).
if (session.cwd && resolver.resolve(session.cwd).class === 'ignore') {
sessionsIgnored += 1
log.info('codex.backfill.usage_policy_drop', {
component: COMPONENT,
operation: 'usage_policy_drop',
conversation_id: session.sessionId,
class: 'ignore',
governed_by: resolver.resolve(session.cwd).governedBy,
status: 'skipped',
})
continue
}

const exchange = projectedExchangeFromSession({
session,
items: filterByWindow(session.items, window),
Expand Down Expand Up @@ -211,6 +237,7 @@ async function* runCodexBackfill(args) {
operation: 'backfill.scan',
files_seen: filesSeen,
sessions_projected: sessionsProjected,
sessions_ignored: sessionsIgnored,
messages_projected: messagesProjected,
status: 'ok',
})
Expand Down
37 changes: 35 additions & 2 deletions hypaware-core/plugins-workspace/codex/src/exchange-projector.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import { createHash } from 'node:crypto'

import { createUsagePolicyResolver } from '../../../../src/core/usage-policy/index.js'
import { redactRemoteUserinfo } from './git-remote.js'

/**
* @import { AiGatewayExchangeInput, AiGatewayExchangeProjector, AiGatewayProjectedExchange, AiGatewayProjectedMessage, JsonObject } from '../../../../collectivus-plugin-kernel-types.js'
* @import { CodexLogReader } from './types.js'
* @import { UsagePolicyResolver } from '../../../../src/core/usage-policy/types.js'
*/

/**
Expand All @@ -27,6 +29,7 @@ import { redactRemoteUserinfo } from './git-remote.js'
* @param {{
* logReaders?: CodexLogReader[],
* env?: Record<string, string | undefined>,
* resolver?: UsagePolicyResolver,
* }} [opts]
* @returns {AiGatewayExchangeProjector}
*/
Expand All @@ -36,6 +39,10 @@ export function createCodexExchangeProjector(opts = {}) {
const logReaders = sqliteReadsEnabled && Array.isArray(opts.logReaders)
? opts.logReaders
: []
// One `.hypignore` resolver per projector instance (one per started
// listener): the per-cwd cache then spans the listener's lifetime so the
// capture hot path adds no unbounded fs work (LLP 0049 R6).
const resolver = opts.resolver ?? createUsagePolicyResolver()

return {
name: 'codex-exchange',
Expand All @@ -54,14 +61,40 @@ export function createCodexExchangeProjector(opts = {}) {
return false
},

/** @param {AiGatewayExchangeInput} input */
project(input) {
/**
* @param {AiGatewayExchangeInput} input
* @param {{ log?: { info?: (m: string, f?: Record<string, unknown>) => void } }} [ctx]
*/
project(input, ctx) {
const reqBody = parseMaybeJson(input.request_body)
if (!isPlainObject(reqBody)) return undefined

const path = input.path ?? ''
const provider = resolveProvider(input, reqBody, path)
const codexContext = resolveCodexContext(input, provider, path, reqBody)

// @ref LLP 0050 [implements]: capture-seam drop, symmetric to the
// @hypaware/claude projector. Once this exchange's cwd is resolved, an
// ancestor `.hypignore` of class `ignore` drops the row by returning no
// projection (the gateway source's `messageRows.length > 0` write guard
// then persists nothing). The response has already streamed, so the live
// call is untouched — only persistence is suppressed (LLP 0049 R1/R2).
// This is the same cwd `resolveRecordedContext` would stamp on the row.
const cwd = firstString(codexContext?.cwd, readRecordedCwd(reqBody))
if (cwd) {
const policy = resolver.resolve(cwd)
if (policy.class === 'ignore') {
ctx?.log?.info?.('plugin.codex.usage_policy_drop', {
component: 'codex',
operation: 'usage_policy_drop',
class: policy.class,
governed_by: policy.governedBy,
cwd_sha256: sha256Hex(cwd).slice(0, 16),
})
return undefined
}
}

const responseBody = parseMaybeJson(input.response_body)
const streamEvents = Array.isArray(input.stream_events) ? input.stream_events : []
const messages = messagesForTransport({ provider, path, reqBody, responseBody, streamEvents })
Expand Down
67 changes: 67 additions & 0 deletions test/plugins/codex-backfill.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@ import {
aiGatewayBackfillMaterializer,
} from '../../hypaware-core/plugins-workspace/ai-gateway/src/dataset.js'
import { createCodexBackfillProvider } from '../../hypaware-core/plugins-workspace/codex/src/backfill.js'
import { createUsagePolicyResolver } from '../../src/core/usage-policy/index.js'

/**
* A real usage-policy resolver wired to an injected fs that reports exactly one
* governing `.hypignore` (class `ignore`) at `ignoredDir`. Mirrors the T2
* @hypaware/claude backfill drop test: exercise the shared matcher, not a stub.
* @ref LLP 0050 [tests]: the codex backfill capture-seam skip
*
* @param {string} ignoredDir
*/
function ignoringResolver(ignoredDir) {
const hypignore = path.join(ignoredDir, '.hypignore')
return createUsagePolicyResolver({
existsSync: (p) => p === hypignore,
readFileSync: () => 'ignore\n',
})
}

/**
* End-to-end tests for the `@hypaware/codex` backfill provider. They run the
Expand Down Expand Up @@ -318,6 +335,56 @@ test('modern rollout projects into canonical ai_gateway_messages rows', async ()
}
})

// @ref LLP 0050 [tests]: capture-seam drop for backfill — a session whose
// recorded cwd is .hypignore-ignored is skipped before projecting or yielding
// any row, so `hyp backfill` never re-imports sessions ignored live (R1).
test('backfill skips a session whose cwd is .hypignore-ignored', async () => {
const env = await stageEnv()
try {
// modernConversation stamps meta.cwd = '/work/repo'.
await writeModernRollout(env, '2026/05/25/rollout-1.jsonl', modernConversation('sess-ignored'))

const provider = createCodexBackfillProvider({
homeDir: env.homeDir,
resolver: ignoringResolver('/work/repo'),
})
const { ctx, entries: logs } = runContext()
const { items } = await collect(provider.run(ctx))

assert.equal(items.length, 0, 'no rows yielded for an ignored session')
const drop = logs.find((e) => e.message === 'codex.backfill.usage_policy_drop')
assert.ok(drop, 'expected a usage_policy_drop log entry')
assert.equal(drop?.fields?.operation, 'usage_policy_drop')
assert.equal(drop?.fields?.conversation_id, 'sess-ignored')
assert.equal(drop?.fields?.governed_by, '/work/repo/.hypignore')
const scanDone = logs.find((e) => e.message === 'codex.backfill.scan_complete')
assert.equal(scanDone?.fields?.sessions_projected, 0)
assert.equal(scanDone?.fields?.sessions_ignored, 1)
} finally {
await env.cleanup()
}
})

test('backfill is unaffected when a different cwd is ignored', async () => {
const env = await stageEnv()
try {
await writeModernRollout(env, '2026/05/25/rollout-1.jsonl', modernConversation('sess-clean'))

const provider = createCodexBackfillProvider({
homeDir: env.homeDir,
// Ignore an unrelated directory; the session's '/work/repo' is clean.
resolver: ignoringResolver('/work/other'),
})
const { ctx } = runContext()
const { items } = await collect(provider.run(ctx))

assert.equal(items.length, 1, 'a non-ignored session still backfills')
assert.equal(value(items[0]).conversation_id, 'sess-clean')
} finally {
await env.cleanup()
}
})

test('token_count event folds per-turn usage (net of cache) onto the turn assistant message', async () => {
const env = await stageEnv()
try {
Expand Down
81 changes: 81 additions & 0 deletions test/plugins/codex-exchange-projector.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,92 @@
// @ts-check

import assert from 'node:assert/strict'
import path from 'node:path'
import test from 'node:test'

import {
createCodexExchangeProjector,
} from '../../hypaware-core/plugins-workspace/codex/src/exchange-projector.js'
import { createUsagePolicyResolver } from '../../src/core/usage-policy/index.js'

/**
* A real usage-policy resolver wired to an injected fs that reports exactly one
* governing `.hypignore` (class `ignore`) at `ignoredDir`. Mirrors how the
* @hypaware/claude projector's drop is tested (T2): exercise the actual shared
* matcher, not a hand-rolled stub.
* @ref LLP 0050 [tests]: the codex live projector's capture-seam drop
*
* @param {string} ignoredDir
*/
function ignoringResolver(ignoredDir) {
const hypignore = path.join(ignoredDir, '.hypignore')
return createUsagePolicyResolver({
existsSync: (p) => p === hypignore,
readFileSync: () => 'ignore\n',
})
}

// @ref LLP 0050 [tests]: capture-seam drop — an ignored cwd yields no rows so
// the gateway write guard persists nothing; a clean cwd is unaffected (R1/R2).
test('project() returns no projection when the exchange cwd is .hypignore-ignored', () => {
const projector = createCodexExchangeProjector({
env: {},
resolver: ignoringResolver('/work/ignored'),
})
const projection = projector.project(exchange({
path: '/v1/chat/completions',
request_body: JSON.stringify({
cwd: '/work/ignored/sub',
messages: [{ role: 'user', content: 'secret' }],
}),
response_body: JSON.stringify({ choices: [{ message: { role: 'assistant', content: 'ok' } }] }),
}), context())
assert.equal(projection, undefined)
})

test('project() is unaffected when the exchange cwd is not ignored', () => {
const projector = createCodexExchangeProjector({
env: {},
resolver: ignoringResolver('/work/ignored'),
})
const projection = /** @type {any} */ (projector.project(exchange({
path: '/v1/chat/completions',
request_body: JSON.stringify({
cwd: '/work/clean',
messages: [{ role: 'user', content: 'hi' }],
}),
response_body: JSON.stringify({ choices: [{ message: { role: 'assistant', content: 'ok' } }] }),
}), context()))
assert.ok(projection)
assert.deepEqual(projection.messages.map((/** @type {any} */ m) => m.role), ['user', 'assistant'])
})

test('project() emits a usage_policy_drop log on an ignored cwd', () => {
/** @type {Array<{ message: string, fields?: Record<string, unknown> }>} */
const infos = []
const projector = createCodexExchangeProjector({
env: {},
resolver: ignoringResolver('/work/ignored'),
})
const log = {
debug() {},
warn() {},
error() {},
/** @param {string} message @param {Record<string, unknown>=} fields */
info: (message, fields) => { infos.push({ message, fields }) },
}
projector.project(exchange({
path: '/v1/chat/completions',
request_body: JSON.stringify({
cwd: '/work/ignored',
messages: [{ role: 'user', content: 'secret' }],
}),
}), { log })
const drop = infos.find((e) => e.message === 'plugin.codex.usage_policy_drop')
assert.ok(drop, 'expected a usage_policy_drop log entry')
assert.equal(drop.fields?.operation, 'usage_policy_drop')
assert.equal(drop.fields?.governed_by, '/work/ignored/.hypignore')
})

test('match() accepts the three transports it owns and rejects others', () => {
const projector = createCodexExchangeProjector({ env: {} })
Expand Down