Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 8 additions & 30 deletions src/agent/agentLoop.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Logger } from 'pino';
import { logErr, withDurationMs } from '../logging/logger.js';
import { withDurationMs } from '../logging/logger.js';
import { isRecord } from '../util/guards.js';
import { parseLenientTopLevelJson } from '../util/json.js';
import { parseLenientOrNull } from '../util/json.js';
import type { ChatMessage, LlmClient } from '../llm/types.js';
import type { ConversationTurn } from '../storage/conversationStore.js';
import type { ToolRegistry, ToolResult } from './tools/types.js';
Expand Down Expand Up @@ -39,11 +39,11 @@ export async function runAgentLoop(opts: {
const { content } = await withDurationMs(opts.log, 'agent.chat_completions', () =>
opts.llm.chatCompletions({ messages, temperature: 0.2 }),
);
const parsed = safeJsonParse({ raw: content, log: opts.log, context: 'agent.response_parse' });
const parsed = parseLenientOrNull({ raw: content, log: opts.log, context: 'agent.response_parse' });
if (!isRecord(parsed)) {
if (looksLikeToolCalls(content) && !repairAttempted) {
const repaired = await repairInvalidToolCalls({ llm: opts.llm, messages, badText: content });
const repairedParsed = safeJsonParse({ raw: repaired, log: opts.log, context: 'agent.response_repair_parse' });
const repairedParsed = parseLenientOrNull({ raw: repaired, log: opts.log, context: 'agent.response_repair_parse' });
if (isRecord(repairedParsed)) {
const toolCalls2 = normalizeToolCalls(repairedParsed['tool_calls'], opts.log, 'agent.tool_calls_repair_parse');
if (Array.isArray(toolCalls2) && toolCalls2.length > 0) {
Expand Down Expand Up @@ -72,7 +72,7 @@ export async function runAgentLoop(opts: {
}
if (looksLikeToolCalls(reply) && !repairAttempted) {
const repaired = await repairInvalidToolCalls({ llm: opts.llm, messages, badText: reply });
const repairedParsed = safeJsonParse({ raw: repaired, log: opts.log, context: 'agent.reply_repair_parse' });
const repairedParsed = parseLenientOrNull({ raw: repaired, log: opts.log, context: 'agent.reply_repair_parse' });
if (isRecord(repairedParsed)) {
const toolCalls2 = normalizeToolCalls(
repairedParsed['tool_calls'],
Expand Down Expand Up @@ -123,14 +123,15 @@ async function repairInvalidToolCalls(opts: { llm: LlmClient; messages: ChatMess
function normalizeToolCalls(v: unknown, log: Logger, context: string): unknown[] | null {
if (Array.isArray(v)) return v;
if (typeof v !== 'string') return null;
const parsed = safeJsonParse({ raw: v, log, context });
const parsed = parseLenientOrNull({ raw: v, log, context });
if (Array.isArray(parsed)) return parsed as unknown[];
if (isRecord(parsed) && Array.isArray(parsed['tool_calls'])) return parsed['tool_calls'] as unknown[];
return null;
}

function extractToolCallsFromText(raw: string, log: Logger, context: string): unknown[] | null {
const parsed = safeJsonParse({ raw, log, context });
if (!looksLikeToolCalls(raw)) return null;
const parsed = parseLenientOrNull({ raw, log, context });
if (isRecord(parsed) && Array.isArray(parsed['tool_calls'])) return parsed['tool_calls'] as unknown[];
return null;
}
Expand All @@ -150,29 +151,6 @@ function compressToMaxLines(text: string, maxLines: number): string {
return [...head, '... (truncated; share vendor/platform + timestamps for deeper triage)'].join('\n');
}

const jsonParseWarnAt: { nextAtMs: number } = { nextAtMs: 0 };
function warnJsonParseDegraded(opts: { log: Logger; context: string; raw: string; err: unknown }): void {
const now = Date.now();
if (now < jsonParseWarnAt.nextAtMs) return;
jsonParseWarnAt.nextAtMs = now + 10 * 60_000;
const snippet = opts.raw.length > 800 ? `${opts.raw.slice(0, 800)}...` : opts.raw;
opts.log.warn(
{
degradedContext: opts.context,
degradedSnippet: snippet,
...logErr(opts.err),
},
'agent JSON parse degraded (falling back)',
);
}

function safeJsonParse(opts: { raw: string; log: Logger; context: string }): unknown {
const v = parseLenientTopLevelJson(opts.raw);
if (v !== null) return v;
warnJsonParseDegraded({ log: opts.log, context: opts.context, raw: opts.raw, err: new Error('unparseable JSON') });
return null;
}

async function runToolCalls(opts: {
tools: ToolRegistry;
toolCalls: unknown[];
Expand Down
1 change: 1 addition & 0 deletions src/agent/prompts/agentPrompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ function flowPlaybooksCompressed(): string[] {
'',
'Examples (not exhaustive):',
'- topTalkersByBytes: prefer topSrcDisplayNames in prose when set; includeDistinctPods only for pod cardinality.',
'- topExternalDestinationsByBytes: for who egresses to a given external IP, use topSources; prefer topSrcDisplayNames per source when set, else srcIp.',
'- namespaceTrafficMatrix: compare internal vs external bytes by namespace.',
'- egressBytesVsBaseline vs egressSpikeDrilldown: table-only baseline vs per-source top destinations ' +
'in one call.',
Expand Down
34 changes: 34 additions & 0 deletions src/agent/tools/handlers/pickAggregatableField.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import type { SearchClient } from '../../../search/types.js';

export async function pickAggregatableField({
client,
index,
field,
}: {
client: SearchClient;
index: string;
field: string | undefined;
}): Promise<string | undefined> {
if (!field) return undefined;
const keyword = `${field}.keyword`;
const resp = await client.fieldCaps({
index,
fields: [field, keyword],
ignore_unavailable: true,
allow_no_indices: true,
});
const caps = (resp.body as { fields?: Record<string, unknown> }).fields ?? {};

const aggregatable = (f: string): boolean => {
const entry = caps[f];
if (!entry || typeof entry !== 'object') return false;
return Object.values(entry as Record<string, unknown>).some((t) => {
if (!t || typeof t !== 'object') return false;
return (t as { aggregatable?: unknown }).aggregatable === true;
});
};

if (aggregatable(field)) return field;
if (aggregatable(keyword)) return keyword;
return undefined;
}
74 changes: 65 additions & 9 deletions src/agent/tools/handlers/topExternalDestinationsByBytes.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { Client } from '@opensearch-project/opensearch';
import { externalDestinationIpBool } from '../../../opensearch/queries/destinationIp.js';
import type { SearchClient } from '../../../search/types.js';
import { getNumber, getString } from '../../../util/guards.js';
import type { AgentPolicy } from '../../policy.js';
import { clampBucketSize, type AgentPolicy } from '../../policy.js';
import { getAggBuckets, getNested } from '../helpers.js';
import { resolveAggToolContext } from './common.js';
import { pickAggregatableField } from './pickAggregatableField.js';

export async function topExternalDestinationsByBytes(
ctx: { client: Client; policy: AgentPolicy; defaultIndex: string },
ctx: { client: SearchClient; policy: AgentPolicy; defaultIndex: string },
args: Record<string, unknown>,
): Promise<unknown> {
const { index, fields, minutesBack, size } = await resolveAggToolContext({
Expand All @@ -16,8 +17,27 @@ export async function topExternalDestinationsByBytes(
defaultSize: 10,
});

const srcPodField = fields.podNameField;
const srcNsField = fields.clientNamespaceField;
const pick = (field: string | undefined) =>
pickAggregatableField({ client: ctx.client, index, field });

const [podNameAggField, nsAggField, displayNameAggField, dstPortAggField] = await Promise.all([
pick(fields.podNameField),
pick(fields.clientNamespaceField),
pick(fields.srcDisplayNameField),
pick(fields.dstPortField),
]);
const displayAggField =
displayNameAggField && displayNameAggField !== podNameAggField && displayNameAggField !== nsAggField
? displayNameAggField
: undefined;
const dstPortTermsField = dstPortAggField ?? fields.dstPortField;

const bySrcLeafAggs: Record<string, unknown> = {
sum_bytes: { sum: { field: fields.bytesField } },
...(podNameAggField ? { src_top_pods: { terms: { field: podNameAggField, size: 1 } } } : {}),
...(nsAggField ? { src_top_namespaces: { terms: { field: nsAggField, size: 1 } } } : {}),
...(displayAggField ? { src_top_display_names: { terms: { field: displayAggField, size: 1 } } } : {}),
};

const { body } = await ctx.client.search({
index,
Expand All @@ -36,9 +56,17 @@ export async function topExternalDestinationsByBytes(
terms: { field: fields.dstIpField, size, order: { sum_bytes: 'desc' } },
aggs: {
sum_bytes: { sum: { field: fields.bytesField } },
...(fields.dstPortField ? { top_ports: { terms: { field: fields.dstPortField, size: 3 } } } : {}),
...(srcPodField ? { top_src_pods: { terms: { field: srcPodField, size: 3 } } } : {}),
...(srcNsField ? { top_src_namespaces: { terms: { field: srcNsField, size: 3 } } } : {}),
top_ports: { terms: { field: dstPortTermsField, size: 3 } },
...(podNameAggField ? { top_src_pods: { terms: { field: podNameAggField, size: 3 } } } : {}),
...(nsAggField ? { top_src_namespaces: { terms: { field: nsAggField, size: 3 } } } : {}),
by_src: {
terms: {
field: fields.srcIpField,
size: clampBucketSize(5, ctx.policy),
order: { sum_bytes: 'desc' },
},
aggs: bySrcLeafAggs,
},
},
},
},
Expand All @@ -65,7 +93,35 @@ export async function topExternalDestinationsByBytes(
namespace: getString(nb['key']),
flows: getNumber(nb['doc_count']),
})),
topSources: getAggBuckets(b, ['by_src', 'buckets']).map((sb) => ({
srcIp: getString(sb['key']),
bytes: getNumber(getNested(sb, ['sum_bytes', 'value'])),
flows: getNumber(sb['doc_count']),
...(podNameAggField
? {
topPodNames: getAggBuckets(sb, ['src_top_pods', 'buckets']).map((pb) => ({
podName: getString(pb['key']),
docCount: getNumber(pb['doc_count']),
})),
}
: {}),
...(nsAggField
? {
topNamespaces: getAggBuckets(sb, ['src_top_namespaces', 'buckets']).map((nb) => ({
namespace: getString(nb['key']),
docCount: getNumber(nb['doc_count']),
})),
}
: {}),
...(displayAggField
? {
topSrcDisplayNames: getAggBuckets(sb, ['src_top_display_names', 'buckets']).map((db) => ({
displayName: getString(db['key']),
docCount: getNumber(db['doc_count']),
})),
}
: {}),
})),
})),
};
}

113 changes: 44 additions & 69 deletions src/agent/tools/handlers/topTalkersByBytes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { SearchClient } from '../../../search/types.js';
import type { AgentPolicy } from '../../policy.js';
import { getAggBuckets, getNested } from '../helpers.js';
import { resolveAggToolContext } from './common.js';
import { pickAggregatableField } from './pickAggregatableField.js';

export async function topTalkersByBytes(
ctx: { client: SearchClient; policy: AgentPolicy; defaultIndex: string },
Expand All @@ -16,35 +17,12 @@ export async function topTalkersByBytes(
});
const includeDistinctPods = args.includeDistinctPods === true;

const pickAggField = async (field: string | undefined): Promise<string | undefined> => {
if (!field) return undefined;
const keyword = `${field}.keyword`;
const resp = await ctx.client.fieldCaps({
index,
fields: [field, keyword],
ignore_unavailable: true,
allow_no_indices: true,
});
const caps = (resp.body as { fields?: Record<string, unknown> }).fields ?? {};

const isAggregatable = (f: string): boolean => {
const entry = caps[f];
if (!entry || typeof entry !== 'object') return false;
return Object.values(entry as Record<string, unknown>).some((t) => {
if (!t || typeof t !== 'object') return false;
return (t as { aggregatable?: unknown }).aggregatable === true;
});
};

if (isAggregatable(field)) return field;
if (isAggregatable(keyword)) return keyword;
return undefined;
};

const pick = (field: string | undefined) =>
pickAggregatableField({ client: ctx.client, index, field });
const [podNameAggField, nsAggField, displayNameAggField] = await Promise.all([
pickAggField(fields.podNameField),
pickAggField(fields.clientNamespaceField),
pickAggField(fields.srcDisplayNameField),
pick(fields.podNameField),
pick(fields.clientNamespaceField),
pick(fields.srcDisplayNameField),
]);
const displayAggField =
displayNameAggField && displayNameAggField !== podNameAggField && displayNameAggField !== nsAggField
Expand All @@ -53,19 +31,13 @@ export async function topTalkersByBytes(

const bySrcAggs: Record<string, unknown> = {
sum_bytes: { sum: { field: fields.bytesField } },
...(podNameAggField ? { top_pods: { terms: { field: podNameAggField, size: 3 } } } : {}),
...(podNameAggField && includeDistinctPods
? { distinct_pods: { cardinality: { field: podNameAggField, precision_threshold: 2000 } } }
: {}),
...(nsAggField ? { top_namespaces: { terms: { field: nsAggField, size: 3 } } } : {}),
...(displayAggField ? { top_display_names: { terms: { field: displayAggField, size: 3 } } } : {}),
};
if (podNameAggField) {
bySrcAggs.top_pods = { terms: { field: podNameAggField, size: 3 } };
if (includeDistinctPods) {
bySrcAggs.distinct_pods = { cardinality: { field: podNameAggField, precision_threshold: 2000 } };
}
}
if (nsAggField) {
bySrcAggs.top_namespaces = { terms: { field: nsAggField, size: 3 } };
}
if (displayAggField) {
bySrcAggs.top_display_names = { terms: { field: displayAggField, size: 3 } };
}

const { body } = await ctx.client.search({
index,
Expand Down Expand Up @@ -93,34 +65,37 @@ export async function topTalkersByBytes(
return {
index,
minutesBack,
talkers: buckets.map((b) => {
const row: Record<string, unknown> = {
srcIp: getString(b['key']),
bytes: getNumber(getNested(b, ['sum_bytes', 'value'])),
docCount: getNumber(b['doc_count']),
};
if (podNameAggField && includeDistinctPods) {
row.distinctPodNamesApprox = getNumber(getNested(b, ['distinct_pods', 'value']));
}
if (podNameAggField) {
row.topPodNames = getAggBuckets(b, ['top_pods', 'buckets']).map((pb) => ({
podName: getString(pb['key']),
docCount: getNumber(pb['doc_count']),
}));
}
if (nsAggField) {
row.topNamespaces = getAggBuckets(b, ['top_namespaces', 'buckets']).map((nb) => ({
namespace: getString(nb['key']),
docCount: getNumber(nb['doc_count']),
}));
}
if (displayAggField) {
row.topSrcDisplayNames = getAggBuckets(b, ['top_display_names', 'buckets']).map((db) => ({
displayName: getString(db['key']),
docCount: getNumber(db['doc_count']),
}));
}
return row;
}),
talkers: buckets.map((b) => ({
srcIp: getString(b['key']),
bytes: getNumber(getNested(b, ['sum_bytes', 'value'])),
docCount: getNumber(b['doc_count']),
...(podNameAggField && includeDistinctPods
? { distinctPodNamesApprox: getNumber(getNested(b, ['distinct_pods', 'value'])) }
: {}),
...(podNameAggField
? {
topPodNames: getAggBuckets(b, ['top_pods', 'buckets']).map((pb) => ({
podName: getString(pb['key']),
docCount: getNumber(pb['doc_count']),
})),
}
: {}),
...(nsAggField
? {
topNamespaces: getAggBuckets(b, ['top_namespaces', 'buckets']).map((nb) => ({
namespace: getString(nb['key']),
docCount: getNumber(nb['doc_count']),
})),
}
: {}),
...(displayAggField
? {
topSrcDisplayNames: getAggBuckets(b, ['top_display_names', 'buckets']).map((db) => ({
displayName: getString(db['key']),
docCount: getNumber(db['doc_count']),
})),
}
: {}),
})),
};
}
3 changes: 2 additions & 1 deletion src/agent/tools/toolSpecs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ export const coreToolSpecs: readonly CoreToolSpec[] = [
},
{
name: 'topExternalDestinationsByBytes',
description: 'Top external destination IPs by egress bytes (RFC1918/CGNAT excluded) with top source pods/namespaces.',
description:
'Top external destination IPs by egress bytes (non-RFC1918/CGNAT). Per dst: topSources (srcIp, bytes, optional pod/ns/display from index), plus dst-level topSourcePods/topSourceNamespaces.',
argsSchema: {
type: 'object',
properties: {
Expand Down
Loading