Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/agent/prompts/agentPrompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ function flowPlaybooksCompressed(): string[] {
'- State plainly when expected fields are missing (AZ, nodes, duration, tcp flags, etc.).',
'',
'Examples (not exhaustive):',
'- topTalkersByBytes: set includeDistinctPods when pod names matter.',
'- topTalkersByBytes: prefer topSrcDisplayNames in prose when set; includeDistinctPods only for pod cardinality.',
'- namespaceTrafficMatrix: compare internal vs external bytes by namespace.',
'- egressBytesVsBaseline vs egressSpikeDrilldown: table-only baseline vs per-source top destinations ' +
'in one call.',
Expand Down
16 changes: 15 additions & 1 deletion src/agent/tools/handlers/topTalkersByBytes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@ export async function topTalkersByBytes(
return undefined;
};

const [podNameAggField, nsAggField] = await Promise.all([
const [podNameAggField, nsAggField, displayNameAggField] = await Promise.all([
pickAggField(fields.podNameField),
pickAggField(fields.clientNamespaceField),
pickAggField(fields.srcDisplayNameField),
]);
const displayAggField =
displayNameAggField && displayNameAggField !== podNameAggField && displayNameAggField !== nsAggField
? displayNameAggField
: undefined;

const bySrcAggs: Record<string, unknown> = {
sum_bytes: { sum: { field: fields.bytesField } },
Expand All @@ -58,6 +63,9 @@ export async function topTalkersByBytes(
if (nsAggField) {
bySrcAggs.top_namespaces = { terms: { field: nsAggField, size: 3 } };
}
if (displayAggField) {
bySrcAggs.top_display_names = { terms: { field: displayAggField, size: 3 } };
}

const { body } = await ctx.client.search({
index,
Expand Down Expand Up @@ -106,6 +114,12 @@ export async function topTalkersByBytes(
docCount: getNumber(nb['doc_count']),
}));
}
if (displayAggField) {
row.topSrcDisplayNames = getAggBuckets(b, ['top_display_names', 'buckets']).map((db) => ({
displayName: getString(db['key']),
docCount: getNumber(db['doc_count']),
}));
}
return row;
}),
};
Expand Down
2 changes: 1 addition & 1 deletion src/agent/tools/toolSpecs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export const coreToolSpecs: readonly CoreToolSpec[] = [
{
name: 'topTalkersByBytes',
description:
'Top source IPs by bytes; includes top pod names / namespaces when mapped; includeDistinctPods adds approximate distinct pod-name cardinality.',
'Top sources by bytes. topSrcDisplayNames when srcDisplayNameField aggregates and differs from pod/namespace terms aggs; topPodNames/topNamespaces; includeDistinctPods for pod cardinality.',
argsSchema: {
type: 'object',
properties: {
Expand Down
169 changes: 169 additions & 0 deletions test/topTalkersByBytes.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { defaultAgentPolicy } from '../src/agent/policy.js';
import * as fieldCaps from '../src/opensearch/fieldCaps.js';
import { topTalkersByBytes } from '../src/agent/tools/handlers/topTalkersByBytes.js';

vi.mock('../src/opensearch/fieldCaps.js', async (importActual) => {
const actual = await importActual<typeof import('../src/opensearch/fieldCaps.js')>();
return { ...actual, chooseFields: vi.fn() };
});

const index = 'elastiflow-flow-codex-*';
const baseFields = {
bytesField: 'flow.bytes',
srcIpField: 'source.ip',
dstIpField: 'dest.ip',
srcPortField: 'source.port',
dstPortField: 'dest.port',
};

function bySrcAggs(client: { search: ReturnType<typeof vi.fn> }) {
return (client.search.mock.calls[0]?.[0] as { body?: { aggs?: { by_src?: { aggs?: Record<string, unknown> } } } })
.body?.aggs?.by_src?.aggs;
}

describe('topTalkersByBytes', () => {
beforeEach(() => {
vi.mocked(fieldCaps.chooseFields).mockResolvedValue({ ...baseFields, srcDisplayNameField: 'host.name' });
});

it('adds top_display_names and topSrcDisplayNames when display field aggregates', async () => {
const capsFromFields = (fields: string[]) => ({
body: { fields: Object.fromEntries(fields.map((f) => [f, { keyword: { aggregatable: true } }])) },
});
const client = {
fieldCaps: vi.fn().mockImplementation((opts: { fields?: string[] }) =>
Promise.resolve(capsFromFields(opts.fields ?? [])),
),
search: vi.fn().mockResolvedValue({
body: {
aggregations: {
by_src: {
buckets: [
{
key: '192.168.1.1',
doc_count: 10,
sum_bytes: { value: 900 },
top_display_names: { buckets: [{ key: 'workstation.local', doc_count: 9 }] },
},
],
},
},
},
}),
};

const out = (await topTalkersByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {})) as {
talkers: Array<{ srcIp: string; topSrcDisplayNames?: { displayName: string; docCount: number }[] }>;
};

expect(bySrcAggs(client)).toMatchObject({
top_display_names: { terms: { field: 'host.name', size: 3 } },
});
expect(out.talkers[0]?.srcIp).toBe('192.168.1.1');
expect(out.talkers[0]?.topSrcDisplayNames).toEqual([{ displayName: 'workstation.local', docCount: 9 }]);
});

it('skips top_display_names when same field as pod terms', async () => {
vi.mocked(fieldCaps.chooseFields).mockResolvedValue({
...baseFields,
podNameField: 'k8s.pod',
srcDisplayNameField: 'k8s.pod',
});
const podCaps = {
body: {
fields: {
'k8s.pod': { keyword: { aggregatable: true } },
'k8s.pod.keyword': { keyword: { aggregatable: true } },
},
},
};
const client = {
fieldCaps: vi.fn().mockResolvedValue(podCaps),
search: vi.fn().mockResolvedValue({
body: {
aggregations: {
by_src: {
buckets: [
{
key: '10.0.0.1',
doc_count: 3,
sum_bytes: { value: 100 },
top_pods: { buckets: [{ key: 'pod-a', doc_count: 3 }] },
},
],
},
},
},
}),
};

await topTalkersByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {});

expect(bySrcAggs(client)?.top_display_names).toBeUndefined();
});

it('omits display agg when field caps mark srcDisplayNameField non-aggregatable', async () => {
const client = {
fieldCaps: vi.fn().mockResolvedValue({
body: { fields: { 'host.name': { keyword: { aggregatable: false } } } },
}),
search: vi.fn().mockResolvedValue({
body: {
aggregations: {
by_src: {
buckets: [{ key: '192.168.1.2', doc_count: 5, sum_bytes: { value: 50 } }],
},
},
},
}),
};

const out = (await topTalkersByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {})) as {
talkers: Array<Record<string, unknown>>;
};

expect(bySrcAggs(client)?.top_display_names).toBeUndefined();
expect(out.talkers[0]).not.toHaveProperty('topSrcDisplayNames');
});

it('skips top_display_names when display terms field matches namespace terms field', async () => {
vi.mocked(fieldCaps.chooseFields).mockResolvedValue({
...baseFields,
clientNamespaceField: 'k8s.ns',
srcDisplayNameField: 'k8s.ns',
});
const caps = {
body: {
fields: {
'k8s.ns': { keyword: { aggregatable: true } },
'k8s.ns.keyword': { keyword: { aggregatable: true } },
},
},
};
const client = {
fieldCaps: vi.fn().mockResolvedValue(caps),
search: vi.fn().mockResolvedValue({
body: {
aggregations: {
by_src: {
buckets: [
{
key: '10.0.0.2',
doc_count: 2,
sum_bytes: { value: 20 },
top_namespaces: { buckets: [{ key: 'default', doc_count: 2 }] },
},
],
},
},
},
}),
};

await topTalkersByBytes({ client: client as never, policy: defaultAgentPolicy, defaultIndex: index }, {});

expect(bySrcAggs(client)?.top_display_names).toBeUndefined();
expect(bySrcAggs(client)?.top_namespaces).toMatchObject({ terms: { field: 'k8s.ns', size: 3 } });
});
});