Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions hypaware-core/plugins-workspace/claude/src/backfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import {
import { pickLatestMatching, readSessionContext } from './session_context.js'
import { deriveRepoFromCwd } from './git_repo.js'
import { anthropicMessageAttributes } from './anthropic.js'
import { createUsagePolicyResolver } from '../../../../src/core/usage-policy/index.js'

/**
* @import { AiGatewayProjectedExchange, AiGatewayProjectedMessage, BackfillContribution, BackfillItem, BackfillProvenance, BackfillRunContext, JsonObject, PluginLogger } from '../../../../collectivus-plugin-kernel-types.js'
* @import { SessionContextRecord, TranscriptEntry } from './types.js'
* @import { UsagePolicyResolver } from '../../../../src/core/usage-policy/types.js'
*/

/**
Expand Down Expand Up @@ -66,6 +68,7 @@ const DAY_MS = 24 * 60 * 60 * 1000
* clientName?: string,
* pluginName?: string,
* deriveRepo?: (cwd: string | undefined) => Promise<{ git_remote?: string, repo_root?: string }>,
* resolver?: UsagePolicyResolver,
* }} opts
* @returns {BackfillContribution}
*/
Expand All @@ -78,14 +81,18 @@ export function createClaudeBackfillProvider(opts) {
// recover it by running git in the session's cwd at backfill time. Injectable
// so tests stub the git lookup and stay hermetic.
const deriveRepo = opts.deriveRepo ?? deriveRepoFromCwd
// One resolver per backfill run (LLP 0050): the per-cwd cache reflects disk at
// run time and is shared across the whole scan. Injectable for hermetic tests.
// @ref LLP 0050 [implements]: skip ignored sessions at the capture seam.
const resolver = opts.resolver ?? createUsagePolicyResolver()

return {
name: clientName,
plugin: pluginName,
datasets: [AI_GATEWAY_MESSAGES_DATASET],
summary: 'Import local Claude Code transcripts into ai_gateway_messages',
async *run(ctx) {
yield* runClaudeBackfill({ ctx, projectsDir, stateFile, clientName, deriveRepo })
yield* runClaudeBackfill({ ctx, projectsDir, stateFile, clientName, deriveRepo, resolver })
},
}
}
Expand All @@ -103,11 +110,12 @@ export function createClaudeBackfillProvider(opts) {
* stateFile: string,
* clientName: string,
* deriveRepo: (cwd: string | undefined) => Promise<{ git_remote?: string, repo_root?: string }>,
* resolver: UsagePolicyResolver,
* }} args
* @returns {AsyncGenerator<BackfillItem>}
*/
async function* runClaudeBackfill(args) {
const { ctx, projectsDir, stateFile, clientName, deriveRepo } = args
const { ctx, projectsDir, stateFile, clientName, deriveRepo, resolver } = args
const log = ctx.log
const window = resolveWindow(ctx)
// Many sessions share a cwd (the same repo, often the same checkout), and
Expand Down Expand Up @@ -166,11 +174,31 @@ async function* runClaudeBackfill(args) {

for (const [sessionId, sessionEntries] of groupBySession(entries)) {
const windowed = filterByWindow(sessionEntries, window)
const record = pickLatestMatching(sessionRecords, { sessionId, transcriptPath: filePath })

// @ref LLP 0050 [implements]: capture-seam drop for backfill. Skip an
// ignored session BEFORE projecting/writing it, else `hyp backfill` would
// silently re-import the exact sessions ignored live (LLP 0049#requirements
// R1). The cwd precedence mirrors projectedExchangeFromEntries (the
// hook-written record wins, else the first transcript line's cwd), so the
// session is tested on the same cwd the row would have carried.
const sessionCwd = record?.cwd ?? windowed.find((entry) => entry.cwd)?.cwd
if (sessionCwd && resolver.isIgnored(sessionCwd)) {
log.info('claude.backfill.usage_policy_drop', {
component: 'plugin.claude.backfill',
operation: 'usage_policy_drop',
session_id: sessionId,
governed_by: resolver.resolve(sessionCwd).governedBy,
status: 'ok',
})
continue
}

const exchange = await projectedExchangeFromEntries({
sessionId,
entries: windowed,
clientName,
record: pickLatestMatching(sessionRecords, { sessionId, transcriptPath: filePath }),
record,
agentMeta,
deriveRepo: deriveRepoCached,
})
Expand Down
29 changes: 29 additions & 0 deletions hypaware-core/plugins-workspace/claude/src/projector.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ import {
pickLatestMatching,
readSessionContext,
} from './session_context.js'
import { createUsagePolicyResolver } from '../../../../src/core/usage-policy/index.js'

/**
* @import { AiGatewayExchangeInput, AiGatewayExchangeProjector, AiGatewayProjectedExchange, AiGatewayProjectedMessage, AiGatewayUpstreamPreset, JsonObject } from '../../../../collectivus-plugin-kernel-types.js'
* @import { TranscriptEntry } from './types.js'
* @import { UsagePolicyResolver } from '../../../../src/core/usage-policy/types.js'
*/

/**
Expand Down Expand Up @@ -80,6 +82,7 @@ import {
* projectsDir?: string,
* clientName?: string,
* logger?: { warn(message: string, fields?: Record<string, unknown>): void, debug?: (m: string, f?: Record<string, unknown>) => void },
* resolver?: UsagePolicyResolver,
* }} opts
* @returns {AiGatewayExchangeProjector}
*/
Expand All @@ -89,6 +92,11 @@ export function createClaudeExchangeProjector(opts) {
const clientName = opts.clientName ?? 'claude'
const logger = opts.logger
const sessionContextCache = createSessionContextCache()
// One resolver per projector (per daemon run): the per-cwd cache rides the
// projector's lifetime so the capture hot path adds no unbounded fs work.
// @ref LLP 0050 [implements]: the .hypignore capture-seam drop lives in the
// client adapter, the only place that resolves a cwd; injectable for tests.
const resolver = opts.resolver ?? createUsagePolicyResolver()

return {
name: 'claude-anthropic-messages',
Expand Down Expand Up @@ -152,6 +160,27 @@ export function createClaudeExchangeProjector(opts) {
sessionId,
})
: undefined

// @ref LLP 0050 [implements]: capture-seam drop. Once the exchange's cwd
// is resolved, an ancestor `.hypignore` that resolves to `ignore` means
// this exchange is never recorded: return no rows BEFORE building any, so
// the gateway source's write guard (`if (messageRows.length > 0)`)
// persists nothing. The response has already streamed to the client, so
// the live LLM call is untouched (LLP 0049#requirements R2). Returning
// `undefined` is this projector's "no rows" signal (the dispatcher treats
// it, and an empty `messages`, identically): a literal `[]` is not a valid
// projection and would log a spurious invalid-output warning.
const cwd = sessionContextRecord?.cwd
if (cwd && resolver.isIgnored(cwd)) {
ctx.log.info('plugin.claude.usage_policy_drop', {
component: 'claude',
operation: 'usage_policy_drop',
exchange_id: input.exchange_id,
governed_by: resolver.resolve(cwd).governedBy,
})
return undefined
}

const transcriptEntries = sessionId
? await loadTranscriptSafe({
projectsDir,
Expand Down
Loading