Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export KUBECONFIG="$(pwd)/e2e/.generated/kubeconfig-kind-kaytoo-e2e"
npm run e2e:verify
```

`verify` starts a temporary chat port-forward if needed.
`verify` starts a temporary chat port-forward if needed. It also checks **OpenSearch Anomaly Detection** (`POST .../_plugins/_anomaly_detection/detectors/_search` returns 200), asserts the **Kaytoo flow egress** detector appears in that listing (native seed/adopt), confirms Kaytoo did **not** log AD plugin `404`, waits one poll window, then requires one of: structured log **`insight_post`** (console sink), **`posted findings`**, or **`skipping heuristic detectors`** (native pipeline idle + healthy empty). With `kaytooOutput: console` in e2e values, proactive text appears as JSON logs (`insightText` on `insight_post`), not Slack/Matrix. If logs contain **`Anomaly`**, verify prints an optional note (typical when a graded `opensearch_anomaly` finding was summarized).

## Host Kaytoo (optional)

Expand Down
41 changes: 41 additions & 0 deletions e2e/cli.sh
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,47 @@ cmd_verify() {
e2e_log "expect topTalkersByBytes in logs"
logs_grep 'agent tool finished' || { kubectl -n "$ns" logs "$pod" -c kaytoo --tail=120; exit 1; }
logs_grep '"tool":"topTalkersByBytes"' || { kubectl -n "$ns" logs "$pod" -c kaytoo --tail=120; exit 1; }

e2e_log "OpenSearch Anomaly Detection API (expect HTTP 200, not 404)"
local ad_http os_base="${OS_URL%/}"
ad_http="$(curl -k -sS -o /dev/null -w '%{http_code}' -u "${OS_USER}:${OS_PASS}" \
-X POST "${os_base}/_plugins/_anomaly_detection/detectors/_search" \
-H 'Content-Type: application/json' \
-d '{"query":{"match_all":{}},"size":1}' 2>/dev/null || echo "000")"
[[ "$ad_http" == "200" ]] || e2e_die "AD detectors/_search returned HTTP ${ad_http} (plugin missing or auth?)"

e2e_log "OpenSearch AD: expect Kaytoo egress detector in list (seed/adopt)"
local ad_json
ad_json="$(curl -k -sS -u "${OS_USER}:${OS_PASS}" \
-X POST "${os_base}/_plugins/_anomaly_detection/detectors/_search" \
-H 'Content-Type: application/json' \
-d '{"query":{"match_all":{}},"size":50}' 2>/dev/null || true)"
echo "$ad_json" | grep -q 'Kaytoo flow egress' ||
e2e_die "AD detector list missing Kaytoo egress detector (native pipeline seed/adopt)"

e2e_log "Kaytoo logs: native AD must not report plugin 404"
if logs_grep 'OpenSearch Anomaly Detection plugin not available (404)' 25000; then
kubectl -n "$ns" logs "$pod" -c kaytoo --tail=200 >&2
e2e_die "Kaytoo logged AD plugin unavailable (404)"
fi

e2e_log "wait for scheduled insight poll (e2e uses pollIntervalSeconds=15)"
sleep 20

e2e_log "Kaytoo logs: console insight path (insight_post, posted findings, or heuristic skip when native idle)"
if logs_grep 'insight_post' 25000 || logs_grep '"msg":"posted findings"' 25000 || logs_grep 'skipping heuristic detectors' 25000; then
e2e_log "insight engine activity ok (console insight_post, posted findings, or native-idle heuristic skip)"
else
kubectl -n "$ns" logs "$pod" -c kaytoo --tail=200 >&2
e2e_die "no insight_post, posted findings, or heuristic-skip in recent logs (see tail above)"
fi

if logs_grep 'Anomaly' 25000; then
e2e_log "optional: Anomaly wording in logs (likely opensearch_anomaly insight path)"
else
e2e_log "optional: no Anomaly string in recent logs (normal if no graded AD hits this window)"
fi

e2e_log "OK verify"
}

Expand Down
8 changes: 7 additions & 1 deletion src/detectors/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ export type FindingSeverity = 'info' | 'low' | 'medium' | 'high';

export type Finding = {
id: string;
kind: 'egress_anomaly' | 'port_scan' | 'rare_destination' | 'opensearch_alert' | 'opensearch_anomaly';
kind:
| 'egress_anomaly'
| 'port_scan'
| 'rare_destination'
| 'opensearch_alert'
| 'opensearch_anomaly'
| 'elasticsearch_ml_anomaly';
severity: FindingSeverity;
title: string;
summary: string;
Expand Down
200 changes: 200 additions & 0 deletions src/elasticsearch/mlAnomalyLifecycle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import type { KaytooConfig } from '../config.js';
import { getLogger, logErr } from '../logging/logger.js';
import { getString, isRecord } from '../util/guards.js';
import {
KAYTOO_ES_DATAFEED_ID,
KAYTOO_ES_JOB_ID,
detectionIntervalMinutes,
} from '../insights/nativeAnomalyConstants.js';
import type { NativeAnomalyPipelineResult } from '../insights/nativeAnomalyTypes.js';

export type ElasticsearchMlClient = import('@elastic/elasticsearch').Client;

export async function createElasticsearchMlClient(config: KaytooConfig['search']): Promise<ElasticsearchMlClient> {
const { Client } = await import('@elastic/elasticsearch');
return new Client({
node: config.url,
auth: { username: config.username, password: config.password },
...(config.tlsInsecure ? { tls: { rejectUnauthorized: false } } : {}),
});
}

function bucketSpan(pollIntervalSeconds: number): string {
return `${detectionIntervalMinutes(pollIntervalSeconds)}m`;
}

export function mlJobMatchesEgressShape(job: unknown, indexPattern: string, srcIpField: string, bytesField: string): boolean {
if (!isRecord(job)) return false;
const dc = job['data_description'];
if (!isRecord(dc) || getString(dc['time_field']) !== '@timestamp') return false;
const ac = job['analysis_config'];
if (!isRecord(ac)) return false;
const dets = ac['detectors'];
if (!Array.isArray(dets)) return false;
const ok = dets.some((d) => {
if (!isRecord(d)) return false;
return (
getString(d['function']) === 'sum' &&
getString(d['field_name']) === bytesField &&
getString(d['over_field_name']) === srcIpField
);
});
if (!ok) return false;
const dfs = job['datafeed_config'];
if (!isRecord(dfs)) return true;
const ix = dfs['indices'];
if (!Array.isArray(ix) || ix.length === 0) return true;
return ix.some(
(i) =>
typeof i === 'string' &&
(i === indexPattern || indexPattern.startsWith(i.replace('*', '')) || i.startsWith(indexPattern.replace('*', ''))),
);
}

async function startDatafeedsForJob(client: ElasticsearchMlClient, jobId: string, log: ReturnType<typeof getLogger>): Promise<void> {
try {
const res = await client.ml.getDatafeeds({ job_id: jobId } as never);
const feeds = isRecord(res) && Array.isArray(res['datafeeds']) ? res['datafeeds'] : [];
for (const f of feeds) {
if (!isRecord(f)) continue;
const id = getString(f['datafeed_id']);
if (id) await client.ml.startDatafeed({ datafeed_id: id }).catch((e) => log.debug({ datafeedId: id, err: String(e) }, 'ml startDatafeed noop'));
}
} catch (e) {
log.debug({ ...logErr(e) }, 'ml getDatafeeds for job');
}
}

function elasticsearchErrorType(e: unknown): string {
if (!isRecord(e)) return '';
const meta = e['meta'];
if (isRecord(meta)) {
const mb = meta['body'];
if (isRecord(mb)) {
const er = mb['error'];
if (isRecord(er)) {
const t = getString(er['type']);
if (t) return t;
}
}
}
const body = e['body'];
if (isRecord(body)) {
const er = body['error'];
if (isRecord(er)) return getString(er['type']);
}
return '';
}

function resourceAlreadyExists(e: unknown): boolean {
if (elasticsearchErrorType(e) === 'resource_already_exists_exception') return true;
return String(e).includes('resource_already_exists_exception');
}

function pickMatchingJobIds(
jobs: unknown[],
indexPattern: string,
srcIpField: string,
bytesField: string,
log: ReturnType<typeof getLogger>,
): string[] {
const matches: { id: string; kaytoo: number }[] = [];
for (const j of jobs) {
if (!isRecord(j)) continue;
const id = getString(j['job_id']);
if (!id || !mlJobMatchesEgressShape(j, indexPattern, srcIpField, bytesField)) continue;
matches.push({ id, kaytoo: id === KAYTOO_ES_JOB_ID ? 0 : 1 });
}
if (matches.length === 0) return [];
const sorted = [...matches].sort((a, b) => a.kaytoo - b.kaytoo || a.id.localeCompare(b.id));
const chosen = sorted[0]!.id;
if (sorted.length > 1) {
log.debug(
{ chosenMlJobId: chosen, otherMatchingMlJobIds: sorted.slice(1).map((m) => m.id) },
'Multiple ML jobs matched egress shape; using deterministic tie-break.',
);
}
return [chosen];
}

async function createKaytooMlJobAndDatafeed(
opts: {
client: ElasticsearchMlClient;
indexPattern: string;
srcIpField: string;
bytesField: string;
pollIntervalSeconds: number;
},
log: ReturnType<typeof getLogger>,
): Promise<NativeAnomalyPipelineResult | null> {
const span = bucketSpan(opts.pollIntervalSeconds);
try {
await opts.client.ml.putJob({
job_id: KAYTOO_ES_JOB_ID,
description: 'Kaytoo-managed flow egress — sum bytes by source IP.',
analysis_config: {
bucket_span: span,
detectors: [{ function: 'sum', field_name: opts.bytesField, over_field_name: opts.srcIpField }],
},
data_description: { time_field: '@timestamp' },
} as never);
} catch (e) {
if (!resourceAlreadyExists(e)) {
log.warn({ ...logErr(e) }, 'Elasticsearch ML putJob failed');
return {
ok: false,
hasScopedSources: false,
warning: 'Could not create Kaytoo Elasticsearch ML job (ML unavailable or insufficient permissions).',
};
}
}

try {
await opts.client.ml.putDatafeed({
datafeed_id: KAYTOO_ES_DATAFEED_ID,
job_id: KAYTOO_ES_JOB_ID,
indices: [opts.indexPattern],
query: { match_all: {} },
scroll_size: 1000,
} as never);
} catch (e) {
if (!resourceAlreadyExists(e)) {
log.warn({ ...logErr(e) }, 'Elasticsearch ML putDatafeed failed');
return { ok: false, hasScopedSources: false, warning: 'Could not create Kaytoo ML datafeed.' };
}
}

return null;
}

export async function ensureElasticsearchMlAnomalyPipeline(opts: {
client: ElasticsearchMlClient;
indexPattern: string;
srcIpField: string;
bytesField: string;
pollIntervalSeconds: number;
}): Promise<NativeAnomalyPipelineResult> {
const log = getLogger({ component: 'insights.nativeAnomaly' });
try {
const list = await opts.client.ml.getJobs({});
const jobs = isRecord(list) && Array.isArray(list['jobs']) ? list['jobs'] : [];
const adopted = pickMatchingJobIds(jobs, opts.indexPattern, opts.srcIpField, opts.bytesField, log);
const createFailed = adopted.length === 0 ? await createKaytooMlJobAndDatafeed(opts, log) : null;
if (createFailed) return createFailed;
const jobIds = adopted.length > 0 ? adopted : [KAYTOO_ES_JOB_ID];

for (const jid of jobIds) {
try {
await opts.client.ml.openJob({ job_id: jid });
} catch {
// already open
}
await startDatafeedsForJob(opts.client, jid, log);
}

return { ok: true, hasScopedSources: jobIds.length > 0, elasticsearch: { jobIds } };
} catch (e) {
log.warn({ ...logErr(e) }, 'Elasticsearch ML pipeline ensure failed');
return { ok: false, hasScopedSources: false, warning: 'Elasticsearch ML pipeline ensure threw.' };
}
}
73 changes: 73 additions & 0 deletions src/insights/elasticsearchDetections.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { Finding } from '../detectors/types.js';
import type { ElasticsearchMlClient } from '../elasticsearch/mlAnomalyLifecycle.js';
import type { DetectionFetchResult } from './opensearchDetections.js';
import { getNumber, getString, isRecord } from '../util/guards.js';

function mlRecordTimestampIso(rec: Record<string, unknown>): string {
const v = rec['timestamp'];
if (typeof v === 'number' && Number.isFinite(v)) {
// ML APIs normally return epoch ms; values below ~1e12 are treated as seconds.
const ms = v < 1e12 ? v * 1000 : v;
return new Date(ms).toISOString();
}
const s = getString(v);
if (!s) return '';
const ms = Date.parse(s);
return Number.isNaN(ms) ? s : new Date(ms).toISOString();
}

function recordToFinding(jobId: string, rec: Record<string, unknown>): Finding {
const score = Math.max(getNumber(rec['record_score']), getNumber(rec['initial_record_score']));
const severity = score >= 90 ? 'high' : score >= 50 ? 'medium' : 'low';
const over = getString(rec['over_field_value']);
const t = mlRecordTimestampIso(rec) || new Date(0).toISOString();
const id = `es-ml:${jobId}:${t}:${over}:${score.toFixed(2)}`;
const evidence: Record<string, unknown> = { jobId, source: rec };
if (over) evidence['contributingSrcIps'] = [over];
return {
id,
kind: 'elasticsearch_ml_anomaly',
severity,
title: over ? `ML anomaly: ${over} (score ${score.toFixed(1)})` : `ML anomaly (score ${score.toFixed(1)})`,
summary: 'Elasticsearch machine learning reported an anomalous record.',
evidence,
window: { from: t, to: t },
};
}

export async function fetchElasticsearchMlAnomalyFindings(opts: {
client: ElasticsearchMlClient;
jobIds: string[];
now: Date;
minutesBack: number;
}): Promise<DetectionFetchResult> {
if (opts.jobIds.length === 0) return { ok: true, findings: [], healthyEmpty: false };

const findings: Finding[] = [];
const to = opts.now.getTime();
const fromMs = to - opts.minutesBack * 60_000;
const start = new Date(fromMs).toISOString();
const end = new Date(to).toISOString();

try {
for (const jobId of opts.jobIds) {
const res = await opts.client.ml.getRecords({
job_id: jobId,
start,
end,
desc: true,
size: 20,
} as never);
const recs = isRecord(res) && Array.isArray(res['records']) ? res['records'] : [];
for (const r of recs) {
if (!isRecord(r)) continue;
const score = Math.max(getNumber(r['record_score']), getNumber(r['initial_record_score']));
if (score <= 0) continue;
findings.push(recordToFinding(jobId, r));
}
}
return { ok: true, findings, healthyEmpty: findings.length === 0 };
} catch (e) {
return { ok: false, findings: [], warning: `Elasticsearch ML getRecords failed: ${String(e)}` };
}
}
Loading