diff --git a/src/agent/prompts/agentPrompt.ts b/src/agent/prompts/agentPrompt.ts index abc9040..4baa5c7 100644 --- a/src/agent/prompts/agentPrompt.ts +++ b/src/agent/prompts/agentPrompt.ts @@ -80,7 +80,7 @@ function flowPlaybooksCompressed(): string[] { '- State plainly when expected fields are missing (AZ, nodes, duration, tcp flags, etc.).', '', 'Examples (not exhaustive):', - '- topTalkersByBytes: set includeDistinctPods when pod names matter.', + '- topTalkersByBytes: prefer topSrcDisplayNames in prose when set; includeDistinctPods only for pod cardinality.', '- namespaceTrafficMatrix: compare internal vs external bytes by namespace.', '- egressBytesVsBaseline vs egressSpikeDrilldown: table-only baseline vs per-source top destinations ' + 'in one call.', diff --git a/src/agent/tools/handlers/topTalkersByBytes.ts b/src/agent/tools/handlers/topTalkersByBytes.ts index 8b6319f..a5fc273 100644 --- a/src/agent/tools/handlers/topTalkersByBytes.ts +++ b/src/agent/tools/handlers/topTalkersByBytes.ts @@ -41,10 +41,15 @@ export async function topTalkersByBytes( return undefined; }; - const [podNameAggField, nsAggField] = await Promise.all([ + const [podNameAggField, nsAggField, displayNameAggField] = await Promise.all([ pickAggField(fields.podNameField), pickAggField(fields.clientNamespaceField), + pickAggField(fields.srcDisplayNameField), ]); + const displayAggField = + displayNameAggField && displayNameAggField !== podNameAggField && displayNameAggField !== nsAggField + ? displayNameAggField + : undefined; const bySrcAggs: Record = { sum_bytes: { sum: { field: fields.bytesField } }, @@ -58,6 +63,9 @@ export async function topTalkersByBytes( if (nsAggField) { bySrcAggs.top_namespaces = { terms: { field: nsAggField, size: 3 } }; } + if (displayAggField) { + bySrcAggs.top_display_names = { terms: { field: displayAggField, size: 3 } }; + } const { body } = await ctx.client.search({ index, @@ -106,6 +114,12 @@ export async function topTalkersByBytes( docCount: getNumber(nb['doc_count']), })); } + if (displayAggField) { + row.topSrcDisplayNames = getAggBuckets(b, ['top_display_names', 'buckets']).map((db) => ({ + displayName: getString(db['key']), + docCount: getNumber(db['doc_count']), + })); + } return row; }), }; diff --git a/src/agent/tools/toolSpecs.ts b/src/agent/tools/toolSpecs.ts index 21a158d..a8b6626 100644 --- a/src/agent/tools/toolSpecs.ts +++ b/src/agent/tools/toolSpecs.ts @@ -76,7 +76,7 @@ export const coreToolSpecs: readonly CoreToolSpec[] = [ { name: 'topTalkersByBytes', description: - 'Top source IPs by bytes; includes top pod names / namespaces when mapped; includeDistinctPods adds approximate distinct pod-name cardinality.', + 'Top sources by bytes. topSrcDisplayNames when srcDisplayNameField aggregates and differs from pod/namespace terms aggs; topPodNames/topNamespaces; includeDistinctPods for pod cardinality.', argsSchema: { type: 'object', properties: { diff --git a/test/topTalkersByBytes.test.ts b/test/topTalkersByBytes.test.ts new file mode 100644 index 0000000..41e08c6 --- /dev/null +++ b/test/topTalkersByBytes.test.ts @@ -0,0 +1,169 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { defaultAgentPolicy } from '../src/agent/policy.js'; +import * as fieldCaps from '../src/opensearch/fieldCaps.js'; +import { topTalkersByBytes } from '../src/agent/tools/handlers/topTalkersByBytes.js'; + +vi.mock('../src/opensearch/fieldCaps.js', async (importActual) => { + const actual = await importActual(); + return { ...actual, chooseFields: vi.fn() }; +}); + +const index = 'elastiflow-flow-codex-*'; +const baseFields = { + bytesField: 'flow.bytes', + srcIpField: 'source.ip', + dstIpField: 'dest.ip', + srcPortField: 'source.port', + dstPortField: 'dest.port', +}; + +function bySrcAggs(client: { search: ReturnType }) { + return (client.search.mock.calls[0]?.[0] as { body?: { aggs?: { by_src?: { aggs?: Record } } } }) + .body?.aggs?.by_src?.aggs; +} + +describe('topTalkersByBytes', () => { + beforeEach(() => { + vi.mocked(fieldCaps.chooseFields).mockResolvedValue({ ...baseFields, srcDisplayNameField: 'host.name' }); + }); + + it('adds top_display_names and topSrcDisplayNames when display field aggregates', async () => { + const capsFromFields = (fields: string[]) => ({ + body: { fields: Object.fromEntries(fields.map((f) => [f, { keyword: { aggregatable: true } }])) }, + }); + const client = { + fieldCaps: vi.fn().mockImplementation((opts: { fields?: string[] }) => + Promise.resolve(capsFromFields(opts.fields ?? [])), + ), + search: vi.fn().mockResolvedValue({ + body: { + aggregations: { + by_src: { + buckets: [ + { + key: '192.168.1.1', + doc_count: 10, + sum_bytes: { value: 900 }, + top_display_names: { buckets: [{ key: 'workstation.local', doc_count: 9 }] }, + }, + ], + }, + }, + }, + }), + }; + + const out = (await topTalkersByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {})) as { + talkers: Array<{ srcIp: string; topSrcDisplayNames?: { displayName: string; docCount: number }[] }>; + }; + + expect(bySrcAggs(client)).toMatchObject({ + top_display_names: { terms: { field: 'host.name', size: 3 } }, + }); + expect(out.talkers[0]?.srcIp).toBe('192.168.1.1'); + expect(out.talkers[0]?.topSrcDisplayNames).toEqual([{ displayName: 'workstation.local', docCount: 9 }]); + }); + + it('skips top_display_names when same field as pod terms', async () => { + vi.mocked(fieldCaps.chooseFields).mockResolvedValue({ + ...baseFields, + podNameField: 'k8s.pod', + srcDisplayNameField: 'k8s.pod', + }); + const podCaps = { + body: { + fields: { + 'k8s.pod': { keyword: { aggregatable: true } }, + 'k8s.pod.keyword': { keyword: { aggregatable: true } }, + }, + }, + }; + const client = { + fieldCaps: vi.fn().mockResolvedValue(podCaps), + search: vi.fn().mockResolvedValue({ + body: { + aggregations: { + by_src: { + buckets: [ + { + key: '10.0.0.1', + doc_count: 3, + sum_bytes: { value: 100 }, + top_pods: { buckets: [{ key: 'pod-a', doc_count: 3 }] }, + }, + ], + }, + }, + }, + }), + }; + + await topTalkersByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {}); + + expect(bySrcAggs(client)?.top_display_names).toBeUndefined(); + }); + + it('omits display agg when field caps mark srcDisplayNameField non-aggregatable', async () => { + const client = { + fieldCaps: vi.fn().mockResolvedValue({ + body: { fields: { 'host.name': { keyword: { aggregatable: false } } } }, + }), + search: vi.fn().mockResolvedValue({ + body: { + aggregations: { + by_src: { + buckets: [{ key: '192.168.1.2', doc_count: 5, sum_bytes: { value: 50 } }], + }, + }, + }, + }), + }; + + const out = (await topTalkersByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {})) as { + talkers: Array>; + }; + + expect(bySrcAggs(client)?.top_display_names).toBeUndefined(); + expect(out.talkers[0]).not.toHaveProperty('topSrcDisplayNames'); + }); + + it('skips top_display_names when display terms field matches namespace terms field', async () => { + vi.mocked(fieldCaps.chooseFields).mockResolvedValue({ + ...baseFields, + clientNamespaceField: 'k8s.ns', + srcDisplayNameField: 'k8s.ns', + }); + const caps = { + body: { + fields: { + 'k8s.ns': { keyword: { aggregatable: true } }, + 'k8s.ns.keyword': { keyword: { aggregatable: true } }, + }, + }, + }; + const client = { + fieldCaps: vi.fn().mockResolvedValue(caps), + search: vi.fn().mockResolvedValue({ + body: { + aggregations: { + by_src: { + buckets: [ + { + key: '10.0.0.2', + doc_count: 2, + sum_bytes: { value: 20 }, + top_namespaces: { buckets: [{ key: 'default', doc_count: 2 }] }, + }, + ], + }, + }, + }, + }), + }; + + await topTalkersByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {}); + + expect(bySrcAggs(client)?.top_display_names).toBeUndefined(); + expect(bySrcAggs(client)?.top_namespaces).toMatchObject({ terms: { field: 'k8s.ns', size: 3 } }); + }); +});