diff --git a/src/agent/agentLoop.ts b/src/agent/agentLoop.ts index 86f6567..b97c0e2 100644 --- a/src/agent/agentLoop.ts +++ b/src/agent/agentLoop.ts @@ -1,7 +1,7 @@ import type { Logger } from 'pino'; -import { logErr, withDurationMs } from '../logging/logger.js'; +import { withDurationMs } from '../logging/logger.js'; import { isRecord } from '../util/guards.js'; -import { parseLenientTopLevelJson } from '../util/json.js'; +import { parseLenientOrNull } from '../util/json.js'; import type { ChatMessage, LlmClient } from '../llm/types.js'; import type { ConversationTurn } from '../storage/conversationStore.js'; import type { ToolRegistry, ToolResult } from './tools/types.js'; @@ -39,11 +39,11 @@ export async function runAgentLoop(opts: { const { content } = await withDurationMs(opts.log, 'agent.chat_completions', () => opts.llm.chatCompletions({ messages, temperature: 0.2 }), ); - const parsed = safeJsonParse({ raw: content, log: opts.log, context: 'agent.response_parse' }); + const parsed = parseLenientOrNull({ raw: content, log: opts.log, context: 'agent.response_parse' }); if (!isRecord(parsed)) { if (looksLikeToolCalls(content) && !repairAttempted) { const repaired = await repairInvalidToolCalls({ llm: opts.llm, messages, badText: content }); - const repairedParsed = safeJsonParse({ raw: repaired, log: opts.log, context: 'agent.response_repair_parse' }); + const repairedParsed = parseLenientOrNull({ raw: repaired, log: opts.log, context: 'agent.response_repair_parse' }); if (isRecord(repairedParsed)) { const toolCalls2 = normalizeToolCalls(repairedParsed['tool_calls'], opts.log, 'agent.tool_calls_repair_parse'); if (Array.isArray(toolCalls2) && toolCalls2.length > 0) { @@ -72,7 +72,7 @@ export async function runAgentLoop(opts: { } if (looksLikeToolCalls(reply) && !repairAttempted) { const repaired = await repairInvalidToolCalls({ llm: opts.llm, messages, badText: reply }); - const repairedParsed = safeJsonParse({ raw: repaired, log: opts.log, context: 'agent.reply_repair_parse' }); + const repairedParsed = parseLenientOrNull({ raw: repaired, log: opts.log, context: 'agent.reply_repair_parse' }); if (isRecord(repairedParsed)) { const toolCalls2 = normalizeToolCalls( repairedParsed['tool_calls'], @@ -123,14 +123,15 @@ async function repairInvalidToolCalls(opts: { llm: LlmClient; messages: ChatMess function normalizeToolCalls(v: unknown, log: Logger, context: string): unknown[] | null { if (Array.isArray(v)) return v; if (typeof v !== 'string') return null; - const parsed = safeJsonParse({ raw: v, log, context }); + const parsed = parseLenientOrNull({ raw: v, log, context }); if (Array.isArray(parsed)) return parsed as unknown[]; if (isRecord(parsed) && Array.isArray(parsed['tool_calls'])) return parsed['tool_calls'] as unknown[]; return null; } function extractToolCallsFromText(raw: string, log: Logger, context: string): unknown[] | null { - const parsed = safeJsonParse({ raw, log, context }); + if (!looksLikeToolCalls(raw)) return null; + const parsed = parseLenientOrNull({ raw, log, context }); if (isRecord(parsed) && Array.isArray(parsed['tool_calls'])) return parsed['tool_calls'] as unknown[]; return null; } @@ -150,29 +151,6 @@ function compressToMaxLines(text: string, maxLines: number): string { return [...head, '... (truncated; share vendor/platform + timestamps for deeper triage)'].join('\n'); } -const jsonParseWarnAt: { nextAtMs: number } = { nextAtMs: 0 }; -function warnJsonParseDegraded(opts: { log: Logger; context: string; raw: string; err: unknown }): void { - const now = Date.now(); - if (now < jsonParseWarnAt.nextAtMs) return; - jsonParseWarnAt.nextAtMs = now + 10 * 60_000; - const snippet = opts.raw.length > 800 ? `${opts.raw.slice(0, 800)}...` : opts.raw; - opts.log.warn( - { - degradedContext: opts.context, - degradedSnippet: snippet, - ...logErr(opts.err), - }, - 'agent JSON parse degraded (falling back)', - ); -} - -function safeJsonParse(opts: { raw: string; log: Logger; context: string }): unknown { - const v = parseLenientTopLevelJson(opts.raw); - if (v !== null) return v; - warnJsonParseDegraded({ log: opts.log, context: opts.context, raw: opts.raw, err: new Error('unparseable JSON') }); - return null; -} - async function runToolCalls(opts: { tools: ToolRegistry; toolCalls: unknown[]; diff --git a/src/agent/prompts/agentPrompt.ts b/src/agent/prompts/agentPrompt.ts index 4baa5c7..701b885 100644 --- a/src/agent/prompts/agentPrompt.ts +++ b/src/agent/prompts/agentPrompt.ts @@ -81,6 +81,7 @@ function flowPlaybooksCompressed(): string[] { '', 'Examples (not exhaustive):', '- topTalkersByBytes: prefer topSrcDisplayNames in prose when set; includeDistinctPods only for pod cardinality.', + '- topExternalDestinationsByBytes: for who egresses to a given external IP, use topSources; prefer topSrcDisplayNames per source when set, else srcIp.', '- namespaceTrafficMatrix: compare internal vs external bytes by namespace.', '- egressBytesVsBaseline vs egressSpikeDrilldown: table-only baseline vs per-source top destinations ' + 'in one call.', diff --git a/src/agent/tools/handlers/pickAggregatableField.ts b/src/agent/tools/handlers/pickAggregatableField.ts new file mode 100644 index 0000000..a3d33ee --- /dev/null +++ b/src/agent/tools/handlers/pickAggregatableField.ts @@ -0,0 +1,34 @@ +import type { SearchClient } from '../../../search/types.js'; + +export async function pickAggregatableField({ + client, + index, + field, +}: { + client: SearchClient; + index: string; + field: string | undefined; +}): Promise { + if (!field) return undefined; + const keyword = `${field}.keyword`; + const resp = await client.fieldCaps({ + index, + fields: [field, keyword], + ignore_unavailable: true, + allow_no_indices: true, + }); + const caps = (resp.body as { fields?: Record }).fields ?? {}; + + const aggregatable = (f: string): boolean => { + const entry = caps[f]; + if (!entry || typeof entry !== 'object') return false; + return Object.values(entry as Record).some((t) => { + if (!t || typeof t !== 'object') return false; + return (t as { aggregatable?: unknown }).aggregatable === true; + }); + }; + + if (aggregatable(field)) return field; + if (aggregatable(keyword)) return keyword; + return undefined; +} diff --git a/src/agent/tools/handlers/topExternalDestinationsByBytes.ts b/src/agent/tools/handlers/topExternalDestinationsByBytes.ts index 1271584..be953a2 100644 --- a/src/agent/tools/handlers/topExternalDestinationsByBytes.ts +++ b/src/agent/tools/handlers/topExternalDestinationsByBytes.ts @@ -1,12 +1,13 @@ -import type { Client } from '@opensearch-project/opensearch'; import { externalDestinationIpBool } from '../../../opensearch/queries/destinationIp.js'; +import type { SearchClient } from '../../../search/types.js'; import { getNumber, getString } from '../../../util/guards.js'; -import type { AgentPolicy } from '../../policy.js'; +import { clampBucketSize, type AgentPolicy } from '../../policy.js'; import { getAggBuckets, getNested } from '../helpers.js'; import { resolveAggToolContext } from './common.js'; +import { pickAggregatableField } from './pickAggregatableField.js'; export async function topExternalDestinationsByBytes( - ctx: { client: Client; policy: AgentPolicy; defaultIndex: string }, + ctx: { client: SearchClient; policy: AgentPolicy; defaultIndex: string }, args: Record, ): Promise { const { index, fields, minutesBack, size } = await resolveAggToolContext({ @@ -16,8 +17,27 @@ export async function topExternalDestinationsByBytes( defaultSize: 10, }); - const srcPodField = fields.podNameField; - const srcNsField = fields.clientNamespaceField; + const pick = (field: string | undefined) => + pickAggregatableField({ client: ctx.client, index, field }); + + const [podNameAggField, nsAggField, displayNameAggField, dstPortAggField] = await Promise.all([ + pick(fields.podNameField), + pick(fields.clientNamespaceField), + pick(fields.srcDisplayNameField), + pick(fields.dstPortField), + ]); + const displayAggField = + displayNameAggField && displayNameAggField !== podNameAggField && displayNameAggField !== nsAggField + ? displayNameAggField + : undefined; + const dstPortTermsField = dstPortAggField ?? fields.dstPortField; + + const bySrcLeafAggs: Record = { + sum_bytes: { sum: { field: fields.bytesField } }, + ...(podNameAggField ? { src_top_pods: { terms: { field: podNameAggField, size: 1 } } } : {}), + ...(nsAggField ? { src_top_namespaces: { terms: { field: nsAggField, size: 1 } } } : {}), + ...(displayAggField ? { src_top_display_names: { terms: { field: displayAggField, size: 1 } } } : {}), + }; const { body } = await ctx.client.search({ index, @@ -36,9 +56,17 @@ export async function topExternalDestinationsByBytes( terms: { field: fields.dstIpField, size, order: { sum_bytes: 'desc' } }, aggs: { sum_bytes: { sum: { field: fields.bytesField } }, - ...(fields.dstPortField ? { top_ports: { terms: { field: fields.dstPortField, size: 3 } } } : {}), - ...(srcPodField ? { top_src_pods: { terms: { field: srcPodField, size: 3 } } } : {}), - ...(srcNsField ? { top_src_namespaces: { terms: { field: srcNsField, size: 3 } } } : {}), + top_ports: { terms: { field: dstPortTermsField, size: 3 } }, + ...(podNameAggField ? { top_src_pods: { terms: { field: podNameAggField, size: 3 } } } : {}), + ...(nsAggField ? { top_src_namespaces: { terms: { field: nsAggField, size: 3 } } } : {}), + by_src: { + terms: { + field: fields.srcIpField, + size: clampBucketSize(5, ctx.policy), + order: { sum_bytes: 'desc' }, + }, + aggs: bySrcLeafAggs, + }, }, }, }, @@ -65,7 +93,35 @@ export async function topExternalDestinationsByBytes( namespace: getString(nb['key']), flows: getNumber(nb['doc_count']), })), + topSources: getAggBuckets(b, ['by_src', 'buckets']).map((sb) => ({ + srcIp: getString(sb['key']), + bytes: getNumber(getNested(sb, ['sum_bytes', 'value'])), + flows: getNumber(sb['doc_count']), + ...(podNameAggField + ? { + topPodNames: getAggBuckets(sb, ['src_top_pods', 'buckets']).map((pb) => ({ + podName: getString(pb['key']), + docCount: getNumber(pb['doc_count']), + })), + } + : {}), + ...(nsAggField + ? { + topNamespaces: getAggBuckets(sb, ['src_top_namespaces', 'buckets']).map((nb) => ({ + namespace: getString(nb['key']), + docCount: getNumber(nb['doc_count']), + })), + } + : {}), + ...(displayAggField + ? { + topSrcDisplayNames: getAggBuckets(sb, ['src_top_display_names', 'buckets']).map((db) => ({ + displayName: getString(db['key']), + docCount: getNumber(db['doc_count']), + })), + } + : {}), + })), })), }; } - diff --git a/src/agent/tools/handlers/topTalkersByBytes.ts b/src/agent/tools/handlers/topTalkersByBytes.ts index a5fc273..8f58f83 100644 --- a/src/agent/tools/handlers/topTalkersByBytes.ts +++ b/src/agent/tools/handlers/topTalkersByBytes.ts @@ -3,6 +3,7 @@ import type { SearchClient } from '../../../search/types.js'; import type { AgentPolicy } from '../../policy.js'; import { getAggBuckets, getNested } from '../helpers.js'; import { resolveAggToolContext } from './common.js'; +import { pickAggregatableField } from './pickAggregatableField.js'; export async function topTalkersByBytes( ctx: { client: SearchClient; policy: AgentPolicy; defaultIndex: string }, @@ -16,35 +17,12 @@ export async function topTalkersByBytes( }); const includeDistinctPods = args.includeDistinctPods === true; - const pickAggField = async (field: string | undefined): Promise => { - if (!field) return undefined; - const keyword = `${field}.keyword`; - const resp = await ctx.client.fieldCaps({ - index, - fields: [field, keyword], - ignore_unavailable: true, - allow_no_indices: true, - }); - const caps = (resp.body as { fields?: Record }).fields ?? {}; - - const isAggregatable = (f: string): boolean => { - const entry = caps[f]; - if (!entry || typeof entry !== 'object') return false; - return Object.values(entry as Record).some((t) => { - if (!t || typeof t !== 'object') return false; - return (t as { aggregatable?: unknown }).aggregatable === true; - }); - }; - - if (isAggregatable(field)) return field; - if (isAggregatable(keyword)) return keyword; - return undefined; - }; - + const pick = (field: string | undefined) => + pickAggregatableField({ client: ctx.client, index, field }); const [podNameAggField, nsAggField, displayNameAggField] = await Promise.all([ - pickAggField(fields.podNameField), - pickAggField(fields.clientNamespaceField), - pickAggField(fields.srcDisplayNameField), + pick(fields.podNameField), + pick(fields.clientNamespaceField), + pick(fields.srcDisplayNameField), ]); const displayAggField = displayNameAggField && displayNameAggField !== podNameAggField && displayNameAggField !== nsAggField @@ -53,19 +31,13 @@ export async function topTalkersByBytes( const bySrcAggs: Record = { sum_bytes: { sum: { field: fields.bytesField } }, + ...(podNameAggField ? { top_pods: { terms: { field: podNameAggField, size: 3 } } } : {}), + ...(podNameAggField && includeDistinctPods + ? { distinct_pods: { cardinality: { field: podNameAggField, precision_threshold: 2000 } } } + : {}), + ...(nsAggField ? { top_namespaces: { terms: { field: nsAggField, size: 3 } } } : {}), + ...(displayAggField ? { top_display_names: { terms: { field: displayAggField, size: 3 } } } : {}), }; - if (podNameAggField) { - bySrcAggs.top_pods = { terms: { field: podNameAggField, size: 3 } }; - if (includeDistinctPods) { - bySrcAggs.distinct_pods = { cardinality: { field: podNameAggField, precision_threshold: 2000 } }; - } - } - if (nsAggField) { - bySrcAggs.top_namespaces = { terms: { field: nsAggField, size: 3 } }; - } - if (displayAggField) { - bySrcAggs.top_display_names = { terms: { field: displayAggField, size: 3 } }; - } const { body } = await ctx.client.search({ index, @@ -93,34 +65,37 @@ export async function topTalkersByBytes( return { index, minutesBack, - talkers: buckets.map((b) => { - const row: Record = { - srcIp: getString(b['key']), - bytes: getNumber(getNested(b, ['sum_bytes', 'value'])), - docCount: getNumber(b['doc_count']), - }; - if (podNameAggField && includeDistinctPods) { - row.distinctPodNamesApprox = getNumber(getNested(b, ['distinct_pods', 'value'])); - } - if (podNameAggField) { - row.topPodNames = getAggBuckets(b, ['top_pods', 'buckets']).map((pb) => ({ - podName: getString(pb['key']), - docCount: getNumber(pb['doc_count']), - })); - } - if (nsAggField) { - row.topNamespaces = getAggBuckets(b, ['top_namespaces', 'buckets']).map((nb) => ({ - namespace: getString(nb['key']), - docCount: getNumber(nb['doc_count']), - })); - } - if (displayAggField) { - row.topSrcDisplayNames = getAggBuckets(b, ['top_display_names', 'buckets']).map((db) => ({ - displayName: getString(db['key']), - docCount: getNumber(db['doc_count']), - })); - } - return row; - }), + talkers: buckets.map((b) => ({ + srcIp: getString(b['key']), + bytes: getNumber(getNested(b, ['sum_bytes', 'value'])), + docCount: getNumber(b['doc_count']), + ...(podNameAggField && includeDistinctPods + ? { distinctPodNamesApprox: getNumber(getNested(b, ['distinct_pods', 'value'])) } + : {}), + ...(podNameAggField + ? { + topPodNames: getAggBuckets(b, ['top_pods', 'buckets']).map((pb) => ({ + podName: getString(pb['key']), + docCount: getNumber(pb['doc_count']), + })), + } + : {}), + ...(nsAggField + ? { + topNamespaces: getAggBuckets(b, ['top_namespaces', 'buckets']).map((nb) => ({ + namespace: getString(nb['key']), + docCount: getNumber(nb['doc_count']), + })), + } + : {}), + ...(displayAggField + ? { + topSrcDisplayNames: getAggBuckets(b, ['top_display_names', 'buckets']).map((db) => ({ + displayName: getString(db['key']), + docCount: getNumber(db['doc_count']), + })), + } + : {}), + })), }; } diff --git a/src/agent/tools/toolSpecs.ts b/src/agent/tools/toolSpecs.ts index a8b6626..4d0ff2e 100644 --- a/src/agent/tools/toolSpecs.ts +++ b/src/agent/tools/toolSpecs.ts @@ -194,7 +194,8 @@ export const coreToolSpecs: readonly CoreToolSpec[] = [ }, { name: 'topExternalDestinationsByBytes', - description: 'Top external destination IPs by egress bytes (RFC1918/CGNAT excluded) with top source pods/namespaces.', + description: + 'Top external destination IPs by egress bytes (non-RFC1918/CGNAT). Per dst: topSources (srcIp, bytes, optional pod/ns/display from index), plus dst-level topSourcePods/topSourceNamespaces.', argsSchema: { type: 'object', properties: { diff --git a/src/chat/adapters/matrix.ts b/src/chat/adapters/matrix.ts index debbefb..798702b 100644 --- a/src/chat/adapters/matrix.ts +++ b/src/chat/adapters/matrix.ts @@ -7,7 +7,7 @@ import { type Room, RoomEvent, } from 'matrix-js-sdk'; -import { getLogger } from '../../logging/logger.js'; +import { getLogger, logErr } from '../../logging/logger.js'; import { createMatrixJsSdkLogger, type MatrixSdkLevel } from '../../logging/matrixSdkLogger.js'; import type { ChatEvent } from '../types.js'; @@ -79,7 +79,6 @@ export async function startMatrixAdapter(opts: { const roomId = room?.roomId ?? event.getRoomId(); if (!roomId) return; const eventId = event.getId() ?? ''; - /** Thread root for MSC threads; `main` sentinel for main-timeline (memory + non-threaded replies). */ const threadKey = event.threadRootId ?? 'main'; const normalized: ChatEvent = { type: 'message', @@ -91,7 +90,7 @@ export async function startMatrixAdapter(opts: { }, user: { id: sender }, text: body, - ts: new Date().toISOString(), + ts: new Date(event.getTs()).toISOString(), ...(eventId ? { eventId } : {}), }; await opts.onEvent(normalized); @@ -103,7 +102,7 @@ export async function startMatrixAdapter(opts: { await client.joinRoom(room.roomId); log.info({ roomId: room.roomId }, 'matrix joined invited room'); } catch (e) { - log.warn({ roomId: room.roomId, err: e }, 'matrix failed to join invited room'); + log.warn({ roomId: room.roomId, ...logErr(e) }, 'matrix failed to join invited room'); } }; @@ -118,7 +117,7 @@ export async function startMatrixAdapter(opts: { log.info({ roomId: opts.defaultRoomId }, 'matrix joined default room'); } catch (e) { log.error( - { roomId: opts.defaultRoomId, botUserId: client.getUserId() ?? undefined, err: e }, + { roomId: opts.defaultRoomId, botUserId: client.getUserId() ?? undefined, ...logErr(e) }, 'matrix cannot join default room; invite the bot user, then restart', ); } diff --git a/src/chat/router.ts b/src/chat/router.ts index 29e5992..67e5810 100644 --- a/src/chat/router.ts +++ b/src/chat/router.ts @@ -2,14 +2,18 @@ import type { Notifier } from '../notify/notifier.js'; import type { AgentRuntime } from '../agent/runtime.js'; import { runWithLogContextAsync } from '../logging/context.js'; import { getLogger, logErr } from '../logging/logger.js'; +import { chatMessageServerTimeMs } from '../util/chatMessageServerTimeMs.js'; import type { ChatEvent, ChatPost } from './types.js'; const log = getLogger({ component: 'chat.router' }); +const INGEST_SKEW_MS = 5_000; + export type ChatRouterDeps = { notifier: Notifier; agent: AgentRuntime; status: () => Promise; + ingestOpenedAtMs?: number; }; export class ChatRouter { @@ -21,6 +25,15 @@ export class ChatRouter { const trimmed = evt.text.trim(); if (!trimmed) return; + const opened = this.deps.ingestOpenedAtMs; + if (opened !== undefined) { + const t = chatMessageServerTimeMs(evt.ts); + if (t !== null && t < opened - INGEST_SKEW_MS) { + log.debug({ platform: evt.platform, t, opened }, 'ingest skip stale'); + return; + } + } + return runWithLogContextAsync( { platform: evt.platform, @@ -49,7 +62,7 @@ export class ChatRouter { await this.deps.notifier.post(this.replyTo(evt, resp.text)); } catch (e) { - log.error(e instanceof Error ? { err: e } : { ...logErr(e) }, 'handleEvent failed'); + log.error(logErr(e), 'handleEvent failed'); } }, ); diff --git a/src/insights/engine.ts b/src/insights/engine.ts index 150e78e..4686faf 100644 --- a/src/insights/engine.ts +++ b/src/insights/engine.ts @@ -1,8 +1,8 @@ import { randomUUID } from 'node:crypto'; import type { KaytooConfig } from '../config.js'; -import type { Logger } from 'pino'; import { getLogger, logErr } from '../logging/logger.js'; import { runWithLogContextAsync } from '../logging/context.js'; +import { createThrottle } from '../logging/throttle.js'; import { createSearchClient } from '../search/client.js'; import { waitForOpenSearchFieldMapping } from '../opensearch/waitForFieldMapping.js'; import { queryPortscanCandidates, queryTopEgressBySource } from '../opensearch/queries/index.js'; @@ -62,7 +62,7 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi includeDebugBodies: config.logging.includeDebugBodies, }); const dedupe = new DedupeStore(config.behavior.dedupeTtlSeconds * 1000); - const warnAt = new Map(); + const shouldWarnDegraded = createThrottle(10 * 60_000); let timer: NodeJS.Timeout | undefined; let inFlight = false; @@ -98,8 +98,12 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi DetectionFetchResult, ]); - if (!alerting.ok && alerting.warning) rateLimitedWarn(log, warnAt, 'alerting', alerting.warning); - if (!ad.ok && ad.warning) rateLimitedWarn(log, warnAt, 'ad', ad.warning); + if (!alerting.ok && alerting.warning && shouldWarnDegraded('alerting')) { + log.warn({ degradedKey: 'alerting', degradedMsg: alerting.warning }, 'insights degraded'); + } + if (!ad.ok && ad.warning && shouldWarnDegraded('ad')) { + log.warn({ degradedKey: 'ad', degradedMsg: ad.warning }, 'insights degraded'); + } const backendFindings = [...alerting.findings, ...ad.findings]; if (backendFindings.length > 0) { @@ -240,11 +244,3 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi }, }; } - -function rateLimitedWarn(log: Logger, map: Map, key: string, msg: string): void { - const now = Date.now(); - const next = map.get(key) ?? 0; - if (now < next) return; - log.warn({ degradedKey: key, degradedMsg: msg }, 'insights degraded'); - map.set(key, now + 10 * 60_000); -} diff --git a/src/insights/opensearchDetections.ts b/src/insights/opensearchDetections.ts index 2355a96..597ef9e 100644 --- a/src/insights/opensearchDetections.ts +++ b/src/insights/opensearchDetections.ts @@ -1,7 +1,8 @@ import type { SearchClient } from '../search/types.js'; import type { Finding } from '../detectors/types.js'; import { getNumber, getString, isRecord } from '../util/guards.js'; -import { getLogger, logErr } from '../logging/logger.js'; +import { getLogger } from '../logging/logger.js'; +import { parseJsonOrNull } from '../util/json.js'; export type DetectionFetchResult = { ok: boolean; @@ -14,17 +15,7 @@ export type DetectionFetchResult = { const ALERT_INDEX_PATTERNS = ['.opensearch-alerting-alerts*', '.opendistro-alerting-alerts*']; const AD_RESULT_INDEX_PATTERNS = ['.opensearch-anomaly-results*', '.opendistro-anomaly-results*']; -const parseWarnAt: { nextAtMs: number } = { nextAtMs: 0 }; -function warnParseDegraded(ctx: string, raw: string, err: unknown): void { - const now = Date.now(); - if (now < parseWarnAt.nextAtMs) return; - parseWarnAt.nextAtMs = now + 10 * 60_000; - const snippet = raw.length > 800 ? `${raw.slice(0, 800)}...` : raw; - getLogger({ component: 'insights.opensearchDetections' }).warn( - { degradedContext: ctx, degradedSnippet: snippet, ...logErr(err) }, - 'OpenSearch detections parse degraded', - ); -} +const detectionsLog = getLogger({ component: 'insights.opensearchDetections' }); function shardsTotal(body: unknown): number { if (!body || typeof body !== 'object') return 0; @@ -130,14 +121,7 @@ type Hit = { _id?: unknown; _index?: unknown; _source?: unknown }; function getHits(body: unknown): Hit[] { const normalized: unknown = typeof body === 'string' - ? (() => { - try { - return JSON.parse(body) as unknown; - } catch (e) { - warnParseDegraded('opensearch.search.body_string', body, e); - return null; - } - })() + ? parseJsonOrNull({ raw: body, context: 'opensearch.search.body_string', log: detectionsLog }) : body; if (!normalized || typeof normalized !== 'object') return []; const hitsObj = (normalized as Record)['hits']; diff --git a/src/llm/openaiCompat.ts b/src/llm/openaiCompat.ts index 52d54e5..d56dca1 100644 --- a/src/llm/openaiCompat.ts +++ b/src/llm/openaiCompat.ts @@ -3,7 +3,7 @@ import type { KaytooConfig } from '../config.js'; import type { Finding } from '../detectors/types.js'; import { getLogger, withDurationMs } from '../logging/logger.js'; import { isRecord } from '../util/guards.js'; -import { parseLenientTopLevelJson } from '../util/json.js'; +import { parseLenientOrNull } from '../util/json.js'; import { sleepMs } from '../util/sleep.js'; import { buildSlackSummaryPrompt } from './prompt.js'; import type { ChatMessage, LlmClient } from './types.js'; @@ -45,11 +45,27 @@ async function resolveChatUrl(baseUrl: string, apiKey: string, log: PinoLogger): return `${baseUrl}/v1/chat/completions`; } +const chatUrlCache = new Map>(); + +/** Test-only: clear the resolved-chat-URL cache so probing tests stay independent. */ +export function resetChatUrlCacheForTests(): void { + chatUrlCache.clear(); +} + +function getChatUrl(baseUrl: string, apiKey: string, log: PinoLogger): Promise { + const key = `${baseUrl}|${apiKey}`; + const cached = chatUrlCache.get(key); + if (cached) return cached; + const p = resolveChatUrl(baseUrl, apiKey, log); + chatUrlCache.set(key, p); + return p; +} + export function createOpenAiCompatClient(config: OpenAiCompatConfig): LlmClient { const log = getLogger({ component: 'llm' }); const baseUrl = config.baseUrl.replace(/\/+$/, ''); const includeBodies = config.includeDebugBodies ?? false; - const chatUrlPromise = resolveChatUrl(baseUrl, config.apiKey, log); + const chatUrlPromise = getChatUrl(baseUrl, config.apiKey, log); return { async chatCompletions(input: { messages: ChatMessage[]; temperature?: number; maxTokens?: number }) { @@ -104,15 +120,8 @@ export function createOpenAiCompatClient(config: OpenAiCompatConfig): LlmClient const { content } = await this.chatCompletions({ messages, temperature: 0.2 }); - const parsed = parseLenientTopLevelJson(content); - if (parsed === null) { - const snippet = content.length > 800 ? `${content.slice(0, 800)}...` : content; - getLogger({ component: 'llm' }).warn( - { degradedContext: 'llm.slack_summary_parse', degradedSnippet: snippet }, - 'LLM JSON parse degraded', - ); - throw new Error(`LLM returned non-JSON content:\n${content}`); - } + const parsed = parseLenientOrNull({ raw: content, context: 'llm.slack_summary_parse', log }); + if (parsed === null) throw new Error(`LLM returned non-JSON content:\n${content}`); if (!isRecord(parsed)) throw new Error('LLM JSON missing object'); const postRaw = parsed['post']; if (typeof postRaw !== 'boolean') throw new Error('LLM JSON missing boolean "post"'); diff --git a/src/logging/throttle.ts b/src/logging/throttle.ts new file mode 100644 index 0000000..0d022db --- /dev/null +++ b/src/logging/throttle.ts @@ -0,0 +1,10 @@ +/** Returns a gate that yields `true` at most once per `everyMs` per key, `false` otherwise. */ +export function createThrottle(everyMs: number): (key?: string) => boolean { + const at = new Map(); + return (key = '') => { + const now = Date.now(); + if (now < (at.get(key) ?? 0)) return false; + at.set(key, now + everyMs); + return true; + }; +} diff --git a/src/main.ts b/src/main.ts index f6e672c..d2cb47f 100644 --- a/src/main.ts +++ b/src/main.ts @@ -153,16 +153,6 @@ if (config.output === 'console') { }) : undefined; - if (config.mattermost) { - startMattermostAdapter({ - baseUrl: config.mattermost.url, - token: config.mattermost.token, - channelId: config.mattermost.channelId, - ...(config.mattermost.botUserId ? { botUserId: config.mattermost.botUserId } : {}), - onEvent, - }); - } - const notifier = createMultiNotifier( notifierBundle({ slack: slackNotifier, matrix: matrixNotifier, mattermost: mattermostNotifier }), ); @@ -176,9 +166,24 @@ if (config.output === 'console') { }); const agent = await createAgentRuntime({ config }); - const router = new ChatRouter({ notifier, agent, status: async () => 'kaytoo: ok' }); + const router = new ChatRouter({ + notifier, + agent, + status: async () => 'kaytoo: ok', + ingestOpenedAtMs: Date.now(), + }); routerCtl.resolve(router); + if (config.mattermost) { + startMattermostAdapter({ + baseUrl: config.mattermost.url, + token: config.mattermost.token, + channelId: config.mattermost.channelId, + ...(config.mattermost.botUserId ? { botUserId: config.mattermost.botUserId } : {}), + onEvent, + }); + } + log.info( { slack: Boolean(slackNotifier), matrix: Boolean(matrixNotifier), mattermost: Boolean(mattermostNotifier) }, 'chat adapters ready', diff --git a/src/opensearch/fieldCaps.ts b/src/opensearch/fieldCaps.ts index 9b155d1..3267c7b 100644 --- a/src/opensearch/fieldCaps.ts +++ b/src/opensearch/fieldCaps.ts @@ -1,4 +1,5 @@ import { getLogger } from '../logging/logger.js'; +import { createThrottle } from '../logging/throttle.js'; import { z } from 'zod'; import type { SearchClient } from '../search/types.js'; import { candidateFields } from './fieldCandidates.js'; @@ -52,11 +53,9 @@ function decodeFieldCapsFields(fields: Record): FieldCapsByFiel return out; } -const warnAt: { nextAtMs: number } = { nextAtMs: 0 }; +const shouldWarnUnexpectedShape = createThrottle(10 * 60_000); function warnUnexpectedShape(msg: string): void { - const now = Date.now(); - if (now < warnAt.nextAtMs) return; - warnAt.nextAtMs = now + 10 * 60_000; + if (!shouldWarnUnexpectedShape()) return; getLogger({ component: 'opensearch.fieldCaps' }).warn({ degradedMsg: msg }, 'unexpected fieldCaps response shape'); } diff --git a/src/storage/conversationStore.ts b/src/storage/conversationStore.ts index 827236c..f99c957 100644 --- a/src/storage/conversationStore.ts +++ b/src/storage/conversationStore.ts @@ -1,6 +1,7 @@ import { mkdir, readFile, rename, writeFile } from 'node:fs/promises'; import { dirname } from 'node:path'; import { getLogger, logErr } from '../logging/logger.js'; +import { createThrottle } from '../logging/throttle.js'; import { parseJsonOrNull } from '../util/json.js'; export type ConversationTurn = { role: 'user' | 'assistant'; content: string }; @@ -118,12 +119,10 @@ export function createFileConversationStore(opts: { /** Non-persistent in-memory store (single process). */ export function createMemoryConversationStore(opts: { ttlMs: number }): ConversationStore { const map = new Map(); - const gc = { nextAtMs: 0 }; + const shouldPrune = createThrottle(60_000); const maybePrune = (): void => { - const now = Date.now(); - if (now < gc.nextAtMs) return; - gc.nextAtMs = now + 60_000; - const cutoff = now - opts.ttlMs; + if (!shouldPrune()) return; + const cutoff = Date.now() - opts.ttlMs; for (const [k, e] of map) { if (!isStoredConversation(e) || e.updatedAtMs < cutoff) map.delete(k); } diff --git a/src/util/chatMessageServerTimeMs.ts b/src/util/chatMessageServerTimeMs.ts new file mode 100644 index 0000000..5d7c705 --- /dev/null +++ b/src/util/chatMessageServerTimeMs.ts @@ -0,0 +1,16 @@ +const SLACK_TS = /^\d+\.\d+$/; + +export function chatMessageServerTimeMs(ts: string): number | null { + const t = ts.trim(); + if (!t) return null; + if (SLACK_TS.test(t)) { + const [sec, frac = '0'] = t.split('.'); + const s = Number.parseInt(sec!, 10); + if (!Number.isFinite(s)) return null; + const micro = Number.parseInt((frac + '000000').slice(0, 6), 10); + if (!Number.isFinite(micro)) return null; + return s * 1000 + micro / 1000; + } + const parsed = Date.parse(t); + return Number.isFinite(parsed) ? parsed : null; +} diff --git a/src/util/json.ts b/src/util/json.ts index 5e2c2ad..7f5ddaa 100644 --- a/src/util/json.ts +++ b/src/util/json.ts @@ -2,10 +2,25 @@ import type { Logger } from 'pino'; import { logErr } from '../logging/logger.js'; import { thrownMessage } from './guards.js'; -function snippet(raw: string, n = 800): string { +const DEFAULT_WARN_EVERY_MS = 10 * 60_000; +const parseWarnAt = new Map(); + +export function snippet(raw: string, n = 800): string { return raw.length > n ? `${raw.slice(0, n)}...` : raw; } +/** Test-only: clear the parse-warn throttle state. */ +export function resetJsonParseWarnThrottlesForTests(): void { + parseWarnAt.clear(); +} + +function warnParseDegraded(log: Logger, context: string, raw: string, err: unknown, everyMs: number): void { + const now = Date.now(); + if (now < (parseWarnAt.get(context) ?? 0)) return; + parseWarnAt.set(context, now + everyMs); + log.warn({ degradedContext: context, degradedSnippet: snippet(raw), ...logErr(err) }, 'JSON parse degraded'); +} + export function parseJsonOrNull(opts: { raw: string; context: string; @@ -15,23 +30,27 @@ export function parseJsonOrNull(opts: { try { return JSON.parse(opts.raw) as unknown; } catch (e) { - if (opts.log) { - const warnEveryMs = opts.warnEveryMs ?? 10 * 60_000; - const l = opts.log as Logger & { __kaytooNextJsonWarnAtMs?: number }; - const now = Date.now(); - const nextAt = l.__kaytooNextJsonWarnAtMs ?? 0; - if (now >= nextAt) { - l.__kaytooNextJsonWarnAtMs = now + warnEveryMs; - opts.log.warn( - { degradedContext: opts.context, degradedSnippet: snippet(opts.raw), ...logErr(e) }, - 'JSON parse degraded', - ); - } - } + if (opts.log) warnParseDegraded(opts.log, opts.context, opts.raw, e, opts.warnEveryMs ?? DEFAULT_WARN_EVERY_MS); return null; } } +/** Lenient variant: strips fences, unwraps once-encoded strings, extracts the first JSON substring. */ +export function parseLenientOrNull(opts: { + raw: string; + context: string; + log?: Logger; + warnEveryMs?: number; +}): unknown | null { + const v = parseLenientTopLevelJson(opts.raw); + if (v !== null) return v; + if (opts.log) { + const err = new Error('unparseable JSON'); + warnParseDegraded(opts.log, opts.context, opts.raw, err, opts.warnEveryMs ?? DEFAULT_WARN_EVERY_MS); + } + return null; +} + export function parseJsonOrThrow(opts: { raw: string; context: string }): unknown { try { return JSON.parse(opts.raw) as unknown; diff --git a/test/chatMessageServerTimeMs.test.ts b/test/chatMessageServerTimeMs.test.ts new file mode 100644 index 0000000..71806de --- /dev/null +++ b/test/chatMessageServerTimeMs.test.ts @@ -0,0 +1,18 @@ +import { describe, expect, it } from 'vitest'; +import { chatMessageServerTimeMs } from '../src/util/chatMessageServerTimeMs.js'; + +describe('chatMessageServerTimeMs', () => { + it('parses Slack message.ts', () => { + expect(chatMessageServerTimeMs('1733263434.002345')).toBeCloseTo(1733263434002.345, 3); + }); + + it('parses ISO', () => { + expect(chatMessageServerTimeMs('2024-06-01T12:00:00.000Z')).toBe(Date.parse('2024-06-01T12:00:00.000Z')); + }); + + it('returns null when unparseable', () => { + expect(chatMessageServerTimeMs('')).toBeNull(); + expect(chatMessageServerTimeMs(' ')).toBeNull(); + expect(chatMessageServerTimeMs('not-a-timestamp')).toBeNull(); + }); +}); diff --git a/test/helpers/logging.ts b/test/helpers/logging.ts index ecb0247..ea29835 100644 --- a/test/helpers/logging.ts +++ b/test/helpers/logging.ts @@ -1,5 +1,7 @@ import pino from 'pino'; import { initLogging, resetLogging } from '../../src/logging/logger.js'; +import { resetChatUrlCacheForTests } from '../../src/llm/openaiCompat.js'; +import { resetJsonParseWarnThrottlesForTests } from '../../src/util/json.js'; /** Pino silent + reset between tests (Vitest hooks). */ export function useSilentLogging( @@ -8,6 +10,8 @@ export function useSilentLogging( ): void { beforeEach(() => { resetLogging(); + resetChatUrlCacheForTests(); + resetJsonParseWarnThrottlesForTests(); initLogging({ level: 'silent', redactPaths: [], @@ -16,5 +20,7 @@ export function useSilentLogging( }); afterEach(() => { resetLogging(); + resetChatUrlCacheForTests(); + resetJsonParseWarnThrottlesForTests(); }); } diff --git a/test/jsonUtil.branches.test.ts b/test/jsonUtil.branches.test.ts index d849d24..b0f5a7d 100644 --- a/test/jsonUtil.branches.test.ts +++ b/test/jsonUtil.branches.test.ts @@ -1,14 +1,18 @@ import type { Logger } from 'pino'; -import { describe, expect, it, vi } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { extractFirstJsonSubstring, parseJsonOrNull, parseJsonOrThrow, parseLenientTopLevelJson, + resetJsonParseWarnThrottlesForTests, stripMarkdownCodeFences, } from '../src/util/json.js'; describe('json util', () => { + beforeEach(() => resetJsonParseWarnThrottlesForTests()); + afterEach(() => resetJsonParseWarnThrottlesForTests()); + it('parseJsonOrThrow / parseJsonOrNull', () => { expect(parseJsonOrNull({ raw: '[1]', context: 'c' })).toEqual([1]); expect(parseJsonOrNull({ raw: '{', context: 'no-log' })).toBeNull(); @@ -17,14 +21,16 @@ describe('json util', () => { }); it('parseJsonOrNull warn rate limit', () => { + vi.useFakeTimers(); const warn = vi.fn(); const log = { warn } as unknown as Logger; parseJsonOrNull({ raw: '{', context: 'x', log, warnEveryMs: 60_000 }); parseJsonOrNull({ raw: '{', context: 'x', log, warnEveryMs: 60_000 }); expect(warn).toHaveBeenCalledTimes(1); - (log as Logger & { __kaytooNextJsonWarnAtMs?: number }).__kaytooNextJsonWarnAtMs = 0; + vi.advanceTimersByTime(60_001); parseJsonOrNull({ raw: '{', context: 'x', log, warnEveryMs: 60_000 }); expect(warn).toHaveBeenCalledTimes(2); + vi.useRealTimers(); }); it('stripMarkdownCodeFences', () => { diff --git a/test/matrixAdapter.test.ts b/test/matrixAdapter.test.ts index b245e54..396cb0c 100644 --- a/test/matrixAdapter.test.ts +++ b/test/matrixAdapter.test.ts @@ -43,6 +43,7 @@ function mkEvent(over: Partial<{ id: string; roomId: string; threadRootId: string | undefined; + ts: number; }>): MatrixEvent { const o = { status: null, @@ -53,6 +54,7 @@ function mkEvent(over: Partial<{ id: '$1', roomId: '!r:hs', threadRootId: undefined as string | undefined, + ts: 1_700_000_000_000, ...over, }; return { @@ -62,6 +64,7 @@ function mkEvent(over: Partial<{ getSender: () => o.sender, getId: () => o.id, getRoomId: () => o.roomId, + getTs: () => o.ts, threadRootId: o.threadRootId, } as MatrixEvent; } @@ -133,13 +136,14 @@ describe('startMatrixAdapter', () => { onEvent, }); const h = timelineHandlers[0]!; - await h(mkEvent({ body: 'ping', sender: '@peer:hs', id: '$e1' }), mkRoom('!room:hs'), false); + await h(mkEvent({ body: 'ping', sender: '@peer:hs', id: '$e1', ts: 1_720_000_000_000 }), mkRoom('!room:hs'), false); expect(onEvent).toHaveBeenCalledWith( expect.objectContaining({ platform: 'matrix', text: 'ping', address: expect.objectContaining({ channelId: '!room:hs', threadId: 'main' }), eventId: '$e1', + ts: new Date(1_720_000_000_000).toISOString(), }), ); await stop(); diff --git a/test/router.test.ts b/test/router.test.ts index 4999ea4..5e64862 100644 --- a/test/router.test.ts +++ b/test/router.test.ts @@ -76,6 +76,113 @@ describe('ChatRouter', () => { expect(posts.lines).toEqual(['agent reply']); }); + it('skips stale chat when ingestOpenedAtMs set', async () => { + const posts = { lines: [] as string[] }; + const notifier: Notifier = { + async post(input) { + posts.lines = [...posts.lines, input.text]; + }, + }; + const agent: AgentRuntime = { + async respond() { + return { text: 'should not run' }; + }, + async resetConversation() {}, + async getConversationDebug() { + return ''; + }, + }; + const ingestOpenedAtMs = Date.parse('2024-06-01T12:00:00.000Z'); + const router = new ChatRouter({ + notifier, + agent, + status: async () => 'ok', + ingestOpenedAtMs, + }); + + await router.handleEvent({ + type: 'message', + platform: 'slack', + address: { platform: 'slack', channelId: 'C1', threadId: 'T1' }, + user: { id: 'U1' }, + text: 'old backlog', + ts: '2024-06-01T11:00:00.000Z', + }); + + expect(posts.lines).toEqual([]); + }); + + it('routes chat at or after ingest open', async () => { + const posts = { lines: [] as string[] }; + const notifier: Notifier = { + async post(input) { + posts.lines = [...posts.lines, input.text]; + }, + }; + const agent: AgentRuntime = { + async respond() { + return { text: 'fresh reply' }; + }, + async resetConversation() {}, + async getConversationDebug() { + return ''; + }, + }; + const ingestOpenedAtMs = Date.parse('2024-06-01T12:00:00.000Z'); + const router = new ChatRouter({ + notifier, + agent, + status: async () => 'ok', + ingestOpenedAtMs, + }); + + await router.handleEvent({ + type: 'message', + platform: 'slack', + address: { platform: 'slack', channelId: 'C1', threadId: 'T1' }, + user: { id: 'U1' }, + text: 'new question', + ts: '2024-06-01T12:00:10.000Z', + }); + + expect(posts.lines).toEqual(['fresh reply']); + }); + + it('does not filter when ts unparseable', async () => { + const posts = { lines: [] as string[] }; + const notifier: Notifier = { + async post(input) { + posts.lines = [...posts.lines, input.text]; + }, + }; + const agent: AgentRuntime = { + async respond() { + return { text: 'parsed anyway' }; + }, + async resetConversation() {}, + async getConversationDebug() { + return ''; + }, + }; + const router = new ChatRouter({ + notifier, + agent, + status: async () => 'ok', + ingestOpenedAtMs: Date.now(), + }); + + await router.handleEvent({ + type: 'message', + platform: 'slack', + address: { platform: 'slack', channelId: 'C1', threadId: 'T1' }, + user: { id: 'U1' }, + text: 'hello', + ts: 'bogus-ts', + }); + + expect(posts.lines).toEqual(['parsed anyway']); + }); + it('does not reject when agent.respond throws', async () => { const notifier: Notifier = { async post() {} }; const agent: AgentRuntime = { diff --git a/test/topExternalDestinationsByBytes.test.ts b/test/topExternalDestinationsByBytes.test.ts new file mode 100644 index 0000000..fb0e5de --- /dev/null +++ b/test/topExternalDestinationsByBytes.test.ts @@ -0,0 +1,222 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { defaultAgentPolicy } from '../src/agent/policy.js'; +import * as fieldCaps from '../src/opensearch/fieldCaps.js'; +import { topExternalDestinationsByBytes } from '../src/agent/tools/handlers/topExternalDestinationsByBytes.js'; + +vi.mock('../src/opensearch/fieldCaps.js', async (importActual) => { + const actual = await importActual(); + return { ...actual, chooseFields: vi.fn() }; +}); + +const index = 'elastiflow-flow-codex-*'; +const baseFields = { + bytesField: 'flow.bytes', + srcIpField: 'source.ip', + dstIpField: 'dest.ip', + srcPortField: 'source.port', + dstPortField: 'dest.port', +}; + +describe('topExternalDestinationsByBytes', () => { + beforeEach(() => { + vi.mocked(fieldCaps.chooseFields).mockResolvedValue({ + ...baseFields, + podNameField: 'flow.client.k8s.pod.name', + clientNamespaceField: 'flow.client.k8s.namespace.name', + srcDisplayNameField: 'host.name', + }); + }); + + it('includes by_src nested aggs and maps topSources with labels', async () => { + const capsFromFields = (fields: string[]) => ({ + body: { fields: Object.fromEntries((fields ?? []).map((f) => [f, { keyword: { aggregatable: true } }])) }, + }); + const client = { + fieldCaps: vi.fn().mockImplementation((opts: { fields?: string[] }) => + Promise.resolve(capsFromFields(opts.fields ?? [])), + ), + search: vi.fn().mockResolvedValue({ + body: { + aggregations: { + by_dst: { + buckets: [ + { + key: '203.0.113.9', + doc_count: 20, + sum_bytes: { value: 5000 }, + top_ports: { buckets: [{ key: 443, doc_count: 10 }] }, + top_src_pods: { buckets: [{ key: 'pod-x', doc_count: 5 }] }, + top_src_namespaces: { buckets: [{ key: 'default', doc_count: 12 }] }, + by_src: { + buckets: [ + { + key: '192.168.1.50', + doc_count: 15, + sum_bytes: { value: 4000 }, + src_top_pods: { buckets: [{ key: 'pod-x', doc_count: 15 }] }, + src_top_namespaces: { buckets: [{ key: 'default', doc_count: 15 }] }, + src_top_display_names: { buckets: [{ key: 'client-host', doc_count: 14 }] }, + }, + ], + }, + }, + ], + }, + }, + }, + }), + }; + + const out = (await topExternalDestinationsByBytes( + { client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, + {}, + )) as { + destinations: Array<{ + dstIp: string; + topSources: Array<{ + srcIp: string; + bytes: number; + flows: number; + topPodNames?: { podName: string; docCount: number }[]; + topNamespaces?: { namespace: string; docCount: number }[]; + topSrcDisplayNames?: { displayName: string; docCount: number }[]; + }>; + }>; + }; + + const req = client.search.mock.calls[0]?.[0] as { + body?: { aggs?: { by_dst?: { aggs?: Record } } }; + }; + expect(req.body?.aggs?.by_dst?.aggs?.by_src).toMatchObject({ + terms: { field: 'source.ip', size: 5, order: { sum_bytes: 'desc' } }, + aggs: expect.objectContaining({ + src_top_display_names: { terms: { field: 'host.name', size: 1 } }, + src_top_pods: { terms: { field: 'flow.client.k8s.pod.name', size: 1 } }, + }), + }); + + expect(out.destinations[0]?.dstIp).toBe('203.0.113.9'); + expect(out.destinations[0]?.topSources[0]).toEqual({ + srcIp: '192.168.1.50', + bytes: 4000, + flows: 15, + topPodNames: [{ podName: 'pod-x', docCount: 15 }], + topNamespaces: [{ namespace: 'default', docCount: 15 }], + topSrcDisplayNames: [{ displayName: 'client-host', docCount: 14 }], + }); + }); + + it('omits src_top_display_names agg when display field matches pod field', async () => { + vi.mocked(fieldCaps.chooseFields).mockResolvedValue({ + ...baseFields, + podNameField: 'k8s.pod', + clientNamespaceField: 'k8s.ns', + srcDisplayNameField: 'k8s.pod', + }); + const podCaps = { + body: { + fields: { + 'k8s.pod': { keyword: { aggregatable: true } }, + 'k8s.pod.keyword': { keyword: { aggregatable: true } }, + 'k8s.ns': { keyword: { aggregatable: true } }, + 'dest.port': { keyword: { aggregatable: true } }, + }, + }, + }; + const client = { + fieldCaps: vi.fn().mockResolvedValue(podCaps), + search: vi.fn().mockResolvedValue({ + body: { + aggregations: { + by_dst: { + buckets: [ + { + key: '198.51.100.1', + doc_count: 2, + sum_bytes: { value: 50 }, + top_ports: { buckets: [] }, + top_src_pods: { buckets: [] }, + top_src_namespaces: { buckets: [] }, + by_src: { + buckets: [ + { + key: '10.0.0.2', + doc_count: 2, + sum_bytes: { value: 50 }, + src_top_pods: { buckets: [{ key: 'p1', doc_count: 2 }] }, + src_top_namespaces: { buckets: [] }, + }, + ], + }, + }, + ], + }, + }, + }, + }), + }; + + await topExternalDestinationsByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {}); + + const req = client.search.mock.calls[0]?.[0] as { + body?: { aggs?: { by_dst?: { aggs?: { by_src?: { aggs?: Record } } } } }; + }; + expect(req.body?.aggs?.by_dst?.aggs?.by_src?.aggs?.src_top_display_names).toBeUndefined(); + }); + + it('omits src_top_display_names agg when display field matches namespace field', async () => { + vi.mocked(fieldCaps.chooseFields).mockResolvedValue({ + ...baseFields, + podNameField: 'k8s.pod', + clientNamespaceField: 'k8s.ns', + srcDisplayNameField: 'k8s.ns', + }); + const caps = { + body: { + fields: { + 'k8s.pod': { keyword: { aggregatable: true } }, + 'k8s.ns': { keyword: { aggregatable: true } }, + 'dest.port': { keyword: { aggregatable: true } }, + }, + }, + }; + const client = { + fieldCaps: vi.fn().mockResolvedValue(caps), + search: vi.fn().mockResolvedValue({ + body: { + aggregations: { + by_dst: { + buckets: [ + { + key: '198.51.100.2', + doc_count: 1, + sum_bytes: { value: 10 }, + top_ports: { buckets: [] }, + top_src_pods: { buckets: [] }, + top_src_namespaces: { buckets: [{ key: 'default', doc_count: 1 }] }, + by_src: { + buckets: [ + { + key: '10.0.0.3', + doc_count: 1, + sum_bytes: { value: 10 }, + src_top_namespaces: { buckets: [{ key: 'default', doc_count: 1 }] }, + }, + ], + }, + }, + ], + }, + }, + }, + }), + }; + + await topExternalDestinationsByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {}); + + const req = client.search.mock.calls[0]?.[0] as { + body?: { aggs?: { by_dst?: { aggs?: { by_src?: { aggs?: Record } } } } }; + }; + expect(req.body?.aggs?.by_dst?.aggs?.by_src?.aggs?.src_top_display_names).toBeUndefined(); + }); +});