From 03afad851a3e1a75285604ba934e7ef4ed0aa584 Mon Sep 17 00:00:00 2001 From: Madison Grubb Date: Wed, 13 May 2026 22:54:50 -0400 Subject: [PATCH 1/5] feat(insights): OpenSearch AD and Elasticsearch ML anomaly pipeline ## Added - Idempotent OpenSearch AD discover/adopt/seed via transport and scoped anomaly result queries. - Elasticsearch ML job, datafeed, and `getRecords`-backed findings. - `elasticsearch_ml_anomaly` finding kind and enrichment when source IPs are present. ## Changed - Insight engine ensures the native pipeline after field mapping; polls OS or ES for anomalies and alerts on the existing path. - Heuristic egress polling skips only when native pipeline is healthy and native fetches report a healthy empty window. - OpenSearch AD/alerting searches accept string JSON bodies using shared `parseJsonOrNull` (shard counts and hits stay consistent). ## Fixed - Heuristics no longer stay suppressed when native anomaly setup is not actually ready. --- src/detectors/types.ts | 8 +- src/elasticsearch/mlAnomalyLifecycle.ts | 152 ++++++++ src/insights/elasticsearchDetections.ts | 61 ++++ src/insights/engine.ts | 66 ++-- src/insights/enrichEgressEvidence.ts | 5 +- src/insights/nativeAnomalyConstants.ts | 17 + src/insights/nativeAnomalyPipeline.ts | 47 +++ src/insights/nativeAnomalyTypes.ts | 7 + src/insights/nativeDetections.ts | 52 +++ src/insights/opensearchDetections.ts | 66 +++- src/insights/pollUtils.ts | 9 +- src/opensearch/adLifecycle.ts | 224 ++++++++++++ test/elasticsearch.mlAnomalyLifecycle.test.ts | 221 ++++++++++++ test/elasticsearchDetections.test.ts | 117 ++++++ test/enrichEgressEvidence.test.ts | 25 ++ test/insightEngine.poll.test.ts | 28 +- test/nativeAnomalyConstants.test.ts | 11 + test/nativeAnomalyPipeline.test.ts | 89 +++++ test/nativeDetections.test.ts | 92 +++++ test/opensearch.adLifecycle.test.ts | 335 ++++++++++++++++++ test/opensearchDetections.test.ts | 143 ++++++++ test/pollUtils.test.ts | 14 +- 22 files changed, 1742 insertions(+), 47 deletions(-) create mode 100644 src/elasticsearch/mlAnomalyLifecycle.ts create mode 100644 src/insights/elasticsearchDetections.ts create mode 100644 src/insights/nativeAnomalyConstants.ts create mode 100644 src/insights/nativeAnomalyPipeline.ts create mode 100644 src/insights/nativeAnomalyTypes.ts create mode 100644 src/insights/nativeDetections.ts create mode 100644 src/opensearch/adLifecycle.ts create mode 100644 test/elasticsearch.mlAnomalyLifecycle.test.ts create mode 100644 test/elasticsearchDetections.test.ts create mode 100644 test/nativeAnomalyConstants.test.ts create mode 100644 test/nativeAnomalyPipeline.test.ts create mode 100644 test/nativeDetections.test.ts create mode 100644 test/opensearch.adLifecycle.test.ts diff --git a/src/detectors/types.ts b/src/detectors/types.ts index 353acb6..d31eaf1 100644 --- a/src/detectors/types.ts +++ b/src/detectors/types.ts @@ -2,7 +2,13 @@ export type FindingSeverity = 'info' | 'low' | 'medium' | 'high'; export type Finding = { id: string; - kind: 'egress_anomaly' | 'port_scan' | 'rare_destination' | 'opensearch_alert' | 'opensearch_anomaly'; + kind: + | 'egress_anomaly' + | 'port_scan' + | 'rare_destination' + | 'opensearch_alert' + | 'opensearch_anomaly' + | 'elasticsearch_ml_anomaly'; severity: FindingSeverity; title: string; summary: string; diff --git a/src/elasticsearch/mlAnomalyLifecycle.ts b/src/elasticsearch/mlAnomalyLifecycle.ts new file mode 100644 index 0000000..16cfe0c --- /dev/null +++ b/src/elasticsearch/mlAnomalyLifecycle.ts @@ -0,0 +1,152 @@ +import type { KaytooConfig } from '../config.js'; +import { getLogger, logErr } from '../logging/logger.js'; +import { getString, isRecord } from '../util/guards.js'; +import { + KAYTOO_ES_DATAFEED_ID, + KAYTOO_ES_JOB_ID, + detectionIntervalMinutes, +} from '../insights/nativeAnomalyConstants.js'; +import type { NativeAnomalyPipelineResult } from '../insights/nativeAnomalyTypes.js'; + +export type ElasticsearchMlClient = import('@elastic/elasticsearch').Client; + +export async function createElasticsearchMlClient(config: KaytooConfig['search']): Promise { + const { Client } = await import('@elastic/elasticsearch'); + return new Client({ + node: config.url, + auth: { username: config.username, password: config.password }, + ...(config.tlsInsecure ? { tls: { rejectUnauthorized: false } } : {}), + }); +} + +function bucketSpan(pollIntervalSeconds: number): string { + return `${detectionIntervalMinutes(pollIntervalSeconds)}m`; +} + +export function mlJobMatchesEgressShape(job: unknown, indexPattern: string, srcIpField: string, bytesField: string): boolean { + if (!isRecord(job)) return false; + const dc = job['data_description']; + if (!isRecord(dc) || getString(dc['time_field']) !== '@timestamp') return false; + const ac = job['analysis_config']; + if (!isRecord(ac)) return false; + const dets = ac['detectors']; + if (!Array.isArray(dets)) return false; + const ok = dets.some((d) => { + if (!isRecord(d)) return false; + return ( + getString(d['function']) === 'sum' && + getString(d['field_name']) === bytesField && + getString(d['over_field_name']) === srcIpField + ); + }); + if (!ok) return false; + const dfs = job['datafeed_config']; + if (!isRecord(dfs)) return true; + const ix = dfs['indices']; + if (!Array.isArray(ix) || ix.length === 0) return true; + return ix.some( + (i) => + typeof i === 'string' && + (i === indexPattern || indexPattern.startsWith(i.replace('*', '')) || i.startsWith(indexPattern.replace('*', ''))), + ); +} + +async function startDatafeedsForJob(client: ElasticsearchMlClient, jobId: string, log: ReturnType): Promise { + try { + const res = await client.ml.getDatafeeds({ job_id: jobId } as never); + const feeds = isRecord(res) && Array.isArray(res['datafeeds']) ? res['datafeeds'] : []; + for (const f of feeds) { + if (!isRecord(f)) continue; + const id = getString(f['datafeed_id']); + if (id) await client.ml.startDatafeed({ datafeed_id: id }).catch((e) => log.debug({ datafeedId: id, err: String(e) }, 'ml startDatafeed noop')); + } + } catch (e) { + log.debug({ ...logErr(e) }, 'ml getDatafeeds for job'); + } +} + +function pickMatchingJobIds(jobs: unknown[], indexPattern: string, srcIpField: string, bytesField: string): string[] { + const matches: { id: string; kaytoo: number }[] = []; + for (const j of jobs) { + if (!isRecord(j)) continue; + const id = getString(j['job_id']); + if (!id || !mlJobMatchesEgressShape(j, indexPattern, srcIpField, bytesField)) continue; + matches.push({ id, kaytoo: id === KAYTOO_ES_JOB_ID ? 0 : 1 }); + } + matches.sort((a, b) => a.kaytoo - b.kaytoo || a.id.localeCompare(b.id)); + return matches.length ? [matches[0]!.id] : []; +} + +function resourceAlreadyExists(e: unknown): boolean { + return String(e).includes('resource_already_exists_exception'); +} + +export async function ensureElasticsearchMlAnomalyPipeline(opts: { + client: ElasticsearchMlClient; + indexPattern: string; + srcIpField: string; + bytesField: string; + pollIntervalSeconds: number; +}): Promise { + const log = getLogger({ component: 'insights.nativeAnomaly' }); + try { + const list = await opts.client.ml.getJobs({}); + const jobs = isRecord(list) && Array.isArray(list['jobs']) ? list['jobs'] : []; + let jobIds = pickMatchingJobIds(jobs, opts.indexPattern, opts.srcIpField, opts.bytesField); + + if (jobIds.length === 0) { + const span = bucketSpan(opts.pollIntervalSeconds); + try { + await opts.client.ml.putJob({ + job_id: KAYTOO_ES_JOB_ID, + description: 'Kaytoo-managed flow egress — sum bytes by source IP.', + analysis_config: { + bucket_span: span, + detectors: [{ function: 'sum', field_name: opts.bytesField, over_field_name: opts.srcIpField }], + }, + data_description: { time_field: '@timestamp' }, + } as never); + } catch (e) { + if (!resourceAlreadyExists(e)) { + log.warn({ ...logErr(e) }, 'Elasticsearch ML putJob failed'); + return { + ok: false, + hasScopedSources: false, + warning: 'Could not create Kaytoo Elasticsearch ML job (ML unavailable or insufficient permissions).', + }; + } + } + + try { + await opts.client.ml.putDatafeed({ + datafeed_id: KAYTOO_ES_DATAFEED_ID, + job_id: KAYTOO_ES_JOB_ID, + indices: [opts.indexPattern], + query: { match_all: {} }, + scroll_size: 1000, + } as never); + } catch (e) { + if (!resourceAlreadyExists(e)) { + log.warn({ ...logErr(e) }, 'Elasticsearch ML putDatafeed failed'); + return { ok: false, hasScopedSources: false, warning: 'Could not create Kaytoo ML datafeed.' }; + } + } + + jobIds = [KAYTOO_ES_JOB_ID]; + } + + for (const jid of jobIds) { + try { + await opts.client.ml.openJob({ job_id: jid }); + } catch { + // already open + } + await startDatafeedsForJob(opts.client, jid, log); + } + + return { ok: true, hasScopedSources: jobIds.length > 0, elasticsearch: { jobIds } }; + } catch (e) { + log.warn({ ...logErr(e) }, 'Elasticsearch ML pipeline ensure failed'); + return { ok: false, hasScopedSources: false, warning: 'Elasticsearch ML pipeline ensure threw.' }; + } +} diff --git a/src/insights/elasticsearchDetections.ts b/src/insights/elasticsearchDetections.ts new file mode 100644 index 0000000..59ae4d9 --- /dev/null +++ b/src/insights/elasticsearchDetections.ts @@ -0,0 +1,61 @@ +import type { Finding } from '../detectors/types.js'; +import type { ElasticsearchMlClient } from '../elasticsearch/mlAnomalyLifecycle.js'; +import type { DetectionFetchResult } from './opensearchDetections.js'; +import { getNumber, getString, isRecord } from '../util/guards.js'; + +function recordToFinding(jobId: string, rec: Record): Finding { + const score = Math.max(getNumber(rec['record_score']), getNumber(rec['initial_record_score'])); + const severity = score >= 90 ? 'high' : score >= 50 ? 'medium' : 'low'; + const over = getString(rec['over_field_value']); + const ts = getString(rec['timestamp']); + const id = `es-ml:${jobId}:${ts}:${over}:${score.toFixed(2)}`; + const t = ts || new Date(0).toISOString(); + const evidence: Record = { jobId, source: rec }; + if (over) evidence['contributingSrcIps'] = [over]; + return { + id, + kind: 'elasticsearch_ml_anomaly', + severity, + title: over ? `ML anomaly: ${over} (score ${score.toFixed(1)})` : `ML anomaly (score ${score.toFixed(1)})`, + summary: 'Elasticsearch machine learning reported an anomalous record.', + evidence, + window: { from: t, to: t }, + }; +} + +export async function fetchElasticsearchMlAnomalyFindings(opts: { + client: ElasticsearchMlClient; + jobIds: string[]; + now: Date; + minutesBack: number; +}): Promise { + if (opts.jobIds.length === 0) return { ok: true, findings: [], healthyEmpty: false }; + + const findings: Finding[] = []; + const to = opts.now.getTime(); + const fromMs = to - opts.minutesBack * 60_000; + const start = new Date(fromMs).toISOString(); + const end = new Date(to).toISOString(); + + try { + for (const jobId of opts.jobIds) { + const res = await opts.client.ml.getRecords({ + job_id: jobId, + start, + end, + desc: true, + size: 20, + } as never); + const recs = isRecord(res) && Array.isArray(res['records']) ? res['records'] : []; + for (const r of recs) { + if (!isRecord(r)) continue; + const score = Math.max(getNumber(r['record_score']), getNumber(r['initial_record_score'])); + if (score <= 0) continue; + findings.push(recordToFinding(jobId, r)); + } + } + return { ok: true, findings, healthyEmpty: findings.length === 0 }; + } catch (e) { + return { ok: false, findings: [], warning: `Elasticsearch ML getRecords failed: ${String(e)}` }; + } +} diff --git a/src/insights/engine.ts b/src/insights/engine.ts index 4686faf..1eede48 100644 --- a/src/insights/engine.ts +++ b/src/insights/engine.ts @@ -14,14 +14,14 @@ import type { InsightSink } from '../notify/insightSink.js'; import { DedupeStore } from '../state/dedupe.js'; import { thrownMessage } from '../util/guards.js'; import { windowRelative } from '../util/time.js'; -import { - fetchOpenSearchAdFindings, - fetchOpenSearchAlertingFindings, - type DetectionFetchResult, -} from './opensearchDetections.js'; +import type { DetectionFetchResult } from './opensearchDetections.js'; import { selectNovelInsightPostBatch, shouldSkipHeuristicPoll } from './pollUtils.js'; import { enrichInsightsEgressBatch } from './enrichEgressEvidence.js'; import { egressInsightWindows } from './egressInsightPolicy.js'; +import { ensureNativeAnomalyPipeline } from './nativeAnomalyPipeline.js'; +import { fetchNativeAlertFindings, fetchNativeAnomalyFindings } from './nativeDetections.js'; +import type { NativeAnomalyPipelineResult } from './nativeAnomalyTypes.js'; +import type { ElasticsearchMlClient } from '../elasticsearch/mlAnomalyLifecycle.js'; function detectionFetchFailure(e: unknown): DetectionFetchResult { return { ok: false, findings: [], warning: thrownMessage(e) }; @@ -57,6 +57,23 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi 'resolved opensearch field mapping', ); + let nativePipeline: NativeAnomalyPipelineResult = { ok: false, hasScopedSources: false }; + let esMlClient: ElasticsearchMlClient | null = null; + try { + const ensured = await ensureNativeAnomalyPipeline({ + backend: config.search.backend, + search: config.search, + searchClient: client, + indexPattern: config.search.indexPattern, + fields, + pollIntervalSeconds: config.behavior.pollIntervalSeconds, + }); + nativePipeline = ensured.pipeline; + esMlClient = ensured.esMlClient; + } catch (e) { + log.warn({ ...logErr(e) }, 'native anomaly pipeline ensure failed; continuing without scoped native anomaly'); + } + const llm = createOpenAiCompatClient({ ...config.llm, includeDebugBodies: config.logging.includeDebugBodies, @@ -66,6 +83,8 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi let timer: NodeJS.Timeout | undefined; let inFlight = false; + const nativePipelineReady = nativePipeline.ok === true && nativePipeline.hasScopedSources === true; + const scheduleNext = (): void => { if (controller.signal.aborted) return; timer = setTimeout(() => void pollOnce(), config.behavior.pollIntervalSeconds * 1000); @@ -80,23 +99,22 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi const now = new Date(); - const [alerting, ad] = - config.search.backend === 'opensearch' - ? await Promise.all([ - fetchOpenSearchAlertingFindings({ - client, - now, - minutesBack: config.behavior.pollIntervalSeconds / 60 + 5, - }).catch(detectionFetchFailure), - fetchOpenSearchAdFindings({ - client, - minutesBack: config.behavior.pollIntervalSeconds / 60 + 10, - }).catch(detectionFetchFailure), - ]) - : ([{ ok: true, findings: [], healthyEmpty: false } as DetectionFetchResult, { ok: true, findings: [], healthyEmpty: false } as DetectionFetchResult] satisfies [ - DetectionFetchResult, - DetectionFetchResult, - ]); + const [alerting, ad] = await Promise.all([ + fetchNativeAlertFindings({ + backend: config.search.backend, + client, + now, + minutesBack: config.behavior.pollIntervalSeconds / 60 + 5, + }).catch(detectionFetchFailure), + fetchNativeAnomalyFindings({ + backend: config.search.backend, + searchClient: client, + esMlClient, + pipeline: nativePipeline, + now, + minutesBack: config.behavior.pollIntervalSeconds / 60 + 10, + }).catch(detectionFetchFailure), + ]); if (!alerting.ok && alerting.warning && shouldWarnDegraded('alerting')) { log.warn({ degradedKey: 'alerting', degradedMsg: alerting.warning }, 'insights degraded'); @@ -111,8 +129,8 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi return; } - if (shouldSkipHeuristicPoll(alerting, ad)) { - log.debug('skipping heuristic detectors: alerting and AD healthy empty'); + if (shouldSkipHeuristicPoll(alerting, ad, nativePipelineReady)) { + log.debug('skipping heuristic detectors: alerting and AD healthy empty with native pipeline ready'); return; } diff --git a/src/insights/enrichEgressEvidence.ts b/src/insights/enrichEgressEvidence.ts index cc8770c..6071469 100644 --- a/src/insights/enrichEgressEvidence.ts +++ b/src/insights/enrichEgressEvidence.ts @@ -19,7 +19,8 @@ export async function enrichEgressFinding(opts: { finding: Finding; }): Promise { const { finding, client, index, fields } = opts; - if (finding.kind !== 'egress_anomaly') return finding; + if (finding.kind !== 'egress_anomaly' && finding.kind !== 'opensearch_anomaly' && finding.kind !== 'elasticsearch_ml_anomaly') + return finding; const raw = finding.evidence['contributingSrcIps']; if (!Array.isArray(raw) || raw.some((x) => typeof x !== 'string')) return finding; const srcIpList = raw.slice(0, MAX_SRC_TERMS); @@ -157,7 +158,7 @@ export async function enrichInsightsEgressBatch(opts: { }): Promise { return Promise.all( opts.findings.map(async (f) => { - if (f.kind !== 'egress_anomaly') return f; + if (f.kind !== 'egress_anomaly' && f.kind !== 'opensearch_anomaly' && f.kind !== 'elasticsearch_ml_anomaly') return f; try { return await enrichEgressFinding({ client: opts.client, diff --git a/src/insights/nativeAnomalyConstants.ts b/src/insights/nativeAnomalyConstants.ts new file mode 100644 index 0000000..6bab511 --- /dev/null +++ b/src/insights/nativeAnomalyConstants.ts @@ -0,0 +1,17 @@ +/** Stable Kaytoo-owned OpenSearch AD detector display name (search/list match). */ +export const KAYTOO_OS_DETECTOR_NAME = 'Kaytoo flow egress by source'; + +/** Elasticsearch ML job + datafeed id prefix (single job). */ +export const KAYTOO_ES_JOB_ID = 'kaytoo-flow-egress-by-src'; + +export const KAYTOO_ES_DATAFEED_ID = `${KAYTOO_ES_JOB_ID}-datafeed`; + +/** Dedicated OS AD result index (code constant; avoids noisy shared default indices). */ +export const KAYTOO_OS_RESULT_INDEX = 'kaytoo-ad-flow-results'; + +export const KAYTOO_OS_FEATURE_NAME = 'kaytoo_sum_bytes'; + +export function detectionIntervalMinutes(pollIntervalSeconds: number): number { + const m = Math.max(5, Math.ceil(pollIntervalSeconds / 60)); + return Math.min(m, 60); +} diff --git a/src/insights/nativeAnomalyPipeline.ts b/src/insights/nativeAnomalyPipeline.ts new file mode 100644 index 0000000..ffcde25 --- /dev/null +++ b/src/insights/nativeAnomalyPipeline.ts @@ -0,0 +1,47 @@ +import type { KaytooConfig } from '../config.js'; +import type { FieldPreference } from '../opensearch/fieldCaps.js'; +import type { SearchClient } from '../search/types.js'; +import { ensureOpenSearchAnomalyPipeline } from '../opensearch/adLifecycle.js'; +import { + createElasticsearchMlClient, + ensureElasticsearchMlAnomalyPipeline, + type ElasticsearchMlClient, +} from '../elasticsearch/mlAnomalyLifecycle.js'; +import type { NativeAnomalyPipelineResult } from './nativeAnomalyTypes.js'; + +export async function ensureNativeAnomalyPipeline(opts: { + backend: KaytooConfig['search']['backend']; + search: KaytooConfig['search']; + searchClient: SearchClient; + indexPattern: string; + fields: FieldPreference; + pollIntervalSeconds: number; +}): Promise<{ pipeline: NativeAnomalyPipelineResult; esMlClient: ElasticsearchMlClient | null }> { + if (opts.backend === 'opensearch') { + const pipeline = await ensureOpenSearchAnomalyPipeline({ + client: opts.searchClient, + indexPattern: opts.indexPattern, + srcIpField: opts.fields.srcIpField, + bytesField: opts.fields.bytesField, + pollIntervalSeconds: opts.pollIntervalSeconds, + }); + return { pipeline, esMlClient: null }; + } + + try { + const esMlClient = await createElasticsearchMlClient(opts.search); + const pipeline = await ensureElasticsearchMlAnomalyPipeline({ + client: esMlClient, + indexPattern: opts.indexPattern, + srcIpField: opts.fields.srcIpField, + bytesField: opts.fields.bytesField, + pollIntervalSeconds: opts.pollIntervalSeconds, + }); + return { pipeline, esMlClient }; + } catch { + return { + pipeline: { ok: false, hasScopedSources: false, warning: 'Elasticsearch ML client unavailable.' }, + esMlClient: null, + }; + } +} diff --git a/src/insights/nativeAnomalyTypes.ts b/src/insights/nativeAnomalyTypes.ts new file mode 100644 index 0000000..0f2e508 --- /dev/null +++ b/src/insights/nativeAnomalyTypes.ts @@ -0,0 +1,7 @@ +export type NativeAnomalyPipelineResult = { + ok: boolean; + hasScopedSources: boolean; + opensearch?: { detectorIds: string[] }; + elasticsearch?: { jobIds: string[] }; + warning?: string; +}; diff --git a/src/insights/nativeDetections.ts b/src/insights/nativeDetections.ts new file mode 100644 index 0000000..2914e36 --- /dev/null +++ b/src/insights/nativeDetections.ts @@ -0,0 +1,52 @@ +import type { KaytooConfig } from '../config.js'; +import type { SearchClient } from '../search/types.js'; +import type { ElasticsearchMlClient } from '../elasticsearch/mlAnomalyLifecycle.js'; +import type { NativeAnomalyPipelineResult } from './nativeAnomalyTypes.js'; +import { + fetchOpenSearchAdFindings, + fetchOpenSearchAlertingFindings, + type DetectionFetchResult, +} from './opensearchDetections.js'; +import { fetchElasticsearchMlAnomalyFindings } from './elasticsearchDetections.js'; + +export async function fetchNativeAlertFindings(opts: { + backend: KaytooConfig['search']['backend']; + client: SearchClient; + now: Date; + minutesBack: number; +}): Promise { + if (opts.backend !== 'opensearch') { + return { ok: true, findings: [], healthyEmpty: false }; + } + return fetchOpenSearchAlertingFindings({ + client: opts.client, + now: opts.now, + minutesBack: opts.minutesBack, + }); +} + +export async function fetchNativeAnomalyFindings(opts: { + backend: KaytooConfig['search']['backend']; + searchClient: SearchClient; + esMlClient: ElasticsearchMlClient | null; + pipeline: NativeAnomalyPipelineResult; + now: Date; + minutesBack: number; +}): Promise { + if (opts.backend === 'opensearch') { + return fetchOpenSearchAdFindings({ + client: opts.searchClient, + minutesBack: opts.minutesBack, + detectorIds: opts.pipeline.opensearch?.detectorIds ?? [], + }); + } + if (opts.backend === 'elasticsearch' && opts.esMlClient && opts.pipeline.elasticsearch?.jobIds?.length) { + return fetchElasticsearchMlAnomalyFindings({ + client: opts.esMlClient, + jobIds: opts.pipeline.elasticsearch.jobIds, + now: opts.now, + minutesBack: opts.minutesBack, + }); + } + return { ok: true, findings: [], healthyEmpty: false }; +} diff --git a/src/insights/opensearchDetections.ts b/src/insights/opensearchDetections.ts index 597ef9e..225939b 100644 --- a/src/insights/opensearchDetections.ts +++ b/src/insights/opensearchDetections.ts @@ -3,6 +3,7 @@ import type { Finding } from '../detectors/types.js'; import { getNumber, getString, isRecord } from '../util/guards.js'; import { getLogger } from '../logging/logger.js'; import { parseJsonOrNull } from '../util/json.js'; +import { KAYTOO_OS_RESULT_INDEX } from './nativeAnomalyConstants.js'; export type DetectionFetchResult = { ok: boolean; @@ -13,13 +14,21 @@ export type DetectionFetchResult = { }; const ALERT_INDEX_PATTERNS = ['.opensearch-alerting-alerts*', '.opendistro-alerting-alerts*']; -const AD_RESULT_INDEX_PATTERNS = ['.opensearch-anomaly-results*', '.opendistro-anomaly-results*']; +const AD_RESULT_INDEX_PATTERNS = [ + `${KAYTOO_OS_RESULT_INDEX}*`, + '.opensearch-anomaly-results*', + '.opendistro-anomaly-results*', +]; const detectionsLog = getLogger({ component: 'insights.opensearchDetections' }); function shardsTotal(body: unknown): number { - if (!body || typeof body !== 'object') return 0; - const shards = (body as Record)['_shards']; + const normalized = + typeof body === 'string' + ? parseJsonOrNull({ raw: body, context: 'opensearch.search.body_shards' }) + : body; + if (!normalized || typeof normalized !== 'object') return 0; + const shards = (normalized as Record)['_shards']; if (!shards || typeof shards !== 'object') return 0; return getNumber((shards as Record)['total']); } @@ -73,12 +82,22 @@ export async function fetchOpenSearchAlertingFindings(opts: { export async function fetchOpenSearchAdFindings(opts: { client: SearchClient; minutesBack: number; + /** Empty array: skip AD (no scoped detectors). Omitted: all detectors (legacy). */ + detectorIds?: string[]; }): Promise { + if (opts.detectorIds && opts.detectorIds.length === 0) { + return { ok: true, findings: [], healthyEmpty: false }; + } + + const detectorFilter = + opts.detectorIds && opts.detectorIds.length > 0 ? [{ terms: { detector_id: opts.detectorIds } }] : []; + const query = { size: 20, query: { bool: { filter: [ + ...detectorFilter, { range: { execution_end_time: { gte: `now-${opts.minutesBack}m`, lt: 'now' } } }, { range: { anomaly_grade: { gt: 0 } } }, ], @@ -119,7 +138,7 @@ export async function fetchOpenSearchAdFindings(opts: { type Hit = { _id?: unknown; _index?: unknown; _source?: unknown }; function getHits(body: unknown): Hit[] { - const normalized: unknown = + const normalized = typeof body === 'string' ? parseJsonOrNull({ raw: body, context: 'opensearch.search.body_string', log: detectionsLog }) : body; @@ -149,21 +168,52 @@ function alertHitToFinding(hit: Hit): Finding { }; } +function collectEntityValues(src: Record): string[] { + const ent = src['entity']; + if (!Array.isArray(ent)) return []; + return ent.flatMap((e) => { + const v = isRecord(e) ? getString(e['value']) : ''; + return v ? [v] : []; + }); +} + +function adWindow(src: Record): { from: string; to: string } { + const from = + getString(src['execution_start_time']) || + getString(src['data_start_time']) || + getString(src['start_time']) || + new Date(0).toISOString(); + const to = + getString(src['execution_end_time']) || + getString(src['data_end_time']) || + getString(src['end_time']) || + new Date().toISOString(); + return { from, to }; +} + function adHitToFinding(hit: Hit): Finding { const src = isRecord(hit._source) ? hit._source : {}; const id = typeof hit._id === 'string' ? hit._id : JSON.stringify({ i: hit._index, id: hit._id }); const grade = getNumber(src['anomaly_grade']); const confidence = getNumber(src['confidence']); const severity = grade >= 0.9 ? 'high' : grade >= 0.7 ? 'medium' : 'low'; + const detectorName = getString(src['detector_name']) || getString(src['name']) || 'detector'; + const entities = collectEntityValues(src); + const title = + entities.length > 0 + ? `Anomaly: ${detectorName} — ${entities.slice(0, 2).join(', ')}${entities.length > 2 ? '…' : ''}` + : `Anomaly: ${detectorName} (grade ${grade.toFixed(2)})`; + const evidence: Record = { index: hit._index, id: hit._id, source: src }; + if (entities.length > 0) evidence['contributingSrcIps'] = entities; return { id: `os-ad:${id}`, kind: 'opensearch_anomaly', severity, - title: `Anomaly detected (grade ${grade.toFixed(2)}, conf ${confidence.toFixed(2)})`, - summary: 'OpenSearch Anomaly Detection reported an anomaly.', - evidence: { index: hit._index, id: hit._id, source: src }, - window: { from: new Date(0).toISOString(), to: new Date().toISOString() }, + title, + summary: `OpenSearch AD grade ${grade.toFixed(2)}, confidence ${confidence.toFixed(2)}.`, + evidence, + window: adWindow(src), }; } diff --git a/src/insights/pollUtils.ts b/src/insights/pollUtils.ts index 62b72c1..5d9a9df 100644 --- a/src/insights/pollUtils.ts +++ b/src/insights/pollUtils.ts @@ -1,8 +1,13 @@ import type { Finding } from '../detectors/types.js'; import type { DetectionFetchResult } from './opensearchDetections.js'; -export function shouldSkipHeuristicPoll(alerting: DetectionFetchResult, ad: DetectionFetchResult): boolean { - return alerting.healthyEmpty === true && ad.healthyEmpty === true; +export function shouldSkipHeuristicPoll( + alerting: DetectionFetchResult, + ad: DetectionFetchResult, + nativePipelineReady: boolean, +): boolean { + if (!(alerting.healthyEmpty === true && ad.healthyEmpty === true)) return false; + return nativePipelineReady === true; } export function findingSeverityRank(s: Finding['severity']): number { diff --git a/src/opensearch/adLifecycle.ts b/src/opensearch/adLifecycle.ts new file mode 100644 index 0000000..c4012fa --- /dev/null +++ b/src/opensearch/adLifecycle.ts @@ -0,0 +1,224 @@ +import type { SearchClient } from '../search/types.js'; +import { getLogger, logErr } from '../logging/logger.js'; +import { getString, isRecord } from '../util/guards.js'; +import { + KAYTOO_OS_DETECTOR_NAME, + KAYTOO_OS_FEATURE_NAME, + KAYTOO_OS_RESULT_INDEX, + detectionIntervalMinutes, +} from '../insights/nativeAnomalyConstants.js'; +import type { NativeAnomalyPipelineResult } from '../insights/nativeAnomalyTypes.js'; + +type TransportClient = SearchClient & { + transport?: { + request: (params: { + method: string; + path: string; + body?: string | Record; + }) => Promise<{ body: unknown; statusCode: number | null }>; + }; +}; + +async function osTransport( + client: SearchClient, + method: string, + path: string, + body?: Record, +): Promise<{ statusCode: number | null; body: unknown }> { + const t = (client as TransportClient).transport; + if (!t?.request) throw new Error('OpenSearch client has no transport.request'); + const res = await t.request({ + method, + path, + ...(body ? { body } : {}), + }); + return { statusCode: res.statusCode, body: res.body }; +} + +function indicesMatch(detectorIndices: unknown, indexPattern: string): boolean { + if (!Array.isArray(detectorIndices) || detectorIndices.length === 0) return false; + const bare = indexPattern.replace('*', ''); + return detectorIndices.some( + (ix) => + typeof ix === 'string' && + (ix === indexPattern || indexPattern.includes(ix.replace('*', '')) || ix.includes(bare)), + ); +} + +function categoryIncludes(det: Record, srcIpField: string): boolean { + const cf = det['category_field']; + if (Array.isArray(cf)) return cf.some((x) => x === srcIpField); + if (typeof cf === 'string') return cf === srcIpField; + return false; +} + +function featureSumsBytes(det: Record, bytesField: string): boolean { + const attrs = det['feature_attributes']; + if (!Array.isArray(attrs)) return false; + for (const a of attrs) { + if (!isRecord(a)) continue; + const q = a['aggregation_query']; + const s = JSON.stringify(q ?? {}); + if (s.includes('"sum"') && s.includes(bytesField)) return true; + } + return false; +} + +export function detectorMatchesEgressShape( + det: unknown, + indexPattern: string, + srcIpField: string, + bytesField: string, +): boolean { + if (!isRecord(det)) return false; + if (getString(det['time_field']) !== '@timestamp') return false; + if (!indicesMatch(det['indices'], indexPattern)) return false; + if (!categoryIncludes(det, srcIpField)) return false; + if (!featureSumsBytes(det, bytesField)) return false; + return true; +} + +function parseDetectorList(body: unknown): Array<{ id: string; raw: Record }> { + const out: Array<{ id: string; raw: Record }> = []; + if (!isRecord(body)) return out; + + const list = body['detectorList']; + if (Array.isArray(list)) { + for (const d of list) { + if (!isRecord(d)) continue; + const id = getString(d['id']) || getString(d['_id']); + if (id) out.push({ id, raw: d }); + } + if (out.length) return out; + } + + const hits = body['hits']; + if (isRecord(hits)) { + const hh = hits['hits']; + if (Array.isArray(hh)) { + for (const h of hh) { + if (!isRecord(h)) continue; + const id = typeof h['_id'] === 'string' ? h['_id'] : getString(h['_id']); + const src = isRecord(h['_source']) ? h['_source'] : h; + if (id && isRecord(src)) out.push({ id, raw: src }); + } + } + } + return out; +} + +function kaytooNameRank(raw: Record): number { + return getString(raw['name']) === KAYTOO_OS_DETECTOR_NAME ? 0 : 1; +} + +function pickEgressDetectors( + detectors: Array<{ id: string; raw: Record }>, + indexPattern: string, + srcIpField: string, + bytesField: string, +): string[] { + const matches = detectors.filter((d) => detectorMatchesEgressShape(d.raw, indexPattern, srcIpField, bytesField)); + if (matches.length === 0) return []; + const first = [...matches].sort( + (a, b) => kaytooNameRank(a.raw) - kaytooNameRank(b.raw) || a.id.localeCompare(b.id), + )[0]!; + return [first.id]; +} + +function buildCreateDetectorBody(opts: { + indexPattern: string; + srcIpField: string; + bytesField: string; + pollIntervalSeconds: number; +}): Record { + const interval = detectionIntervalMinutes(opts.pollIntervalSeconds); + return { + name: KAYTOO_OS_DETECTOR_NAME, + description: 'Kaytoo-managed high-cardinality egress volume anomaly detector.', + time_field: '@timestamp', + indices: [opts.indexPattern], + detector_type: 'MULTI_ENTITY', + category_field: [opts.srcIpField], + feature_attributes: [ + { + feature_name: KAYTOO_OS_FEATURE_NAME, + feature_enabled: true, + aggregation_query: { + [KAYTOO_OS_FEATURE_NAME]: { sum: { field: opts.bytesField } }, + }, + }, + ], + detection_interval: { period: { interval, unit: 'Minutes' } }, + window_delay: { period: { interval: 1, unit: 'Minutes' } }, + result_index: KAYTOO_OS_RESULT_INDEX, + }; +} + +export async function ensureOpenSearchAnomalyPipeline(opts: { + client: SearchClient; + indexPattern: string; + srcIpField: string; + bytesField: string; + pollIntervalSeconds: number; +}): Promise { + const log = getLogger({ component: 'insights.nativeAnomaly' }); + try { + const searchRes = await osTransport(opts.client, 'POST', '/_plugins/_anomaly_detection/detectors/_search', { + query: { match_all: {} }, + size: 500, + }); + if (searchRes.statusCode === 404) { + return { ok: false, hasScopedSources: false, warning: 'OpenSearch Anomaly Detection plugin not available (404).' }; + } + if (searchRes.statusCode && searchRes.statusCode >= 400) { + return { + ok: false, + hasScopedSources: false, + warning: `OpenSearch AD search failed (${searchRes.statusCode}).`, + }; + } + + const listed = parseDetectorList(searchRes.body); + let detectorIds = pickEgressDetectors(listed, opts.indexPattern, opts.srcIpField, opts.bytesField); + + if (detectorIds.length === 0) { + const createRes = await osTransport(opts.client, 'POST', '/_plugins/_anomaly_detection/detectors', buildCreateDetectorBody(opts)); + if (createRes.statusCode && createRes.statusCode >= 400) { + log.warn({ statusCode: createRes.statusCode, body: createRes.body }, 'OpenSearch AD create detector failed'); + return { + ok: false, + hasScopedSources: false, + warning: 'Could not create Kaytoo OpenSearch anomaly detector (insufficient permissions or validation error).', + }; + } + const created = isRecord(createRes.body) ? createRes.body : {}; + const newId = getString(created['_id']); + if (newId) detectorIds = [newId]; + else { + const relist = parseDetectorList( + (await osTransport(opts.client, 'POST', '/_plugins/_anomaly_detection/detectors/_search', { + query: { term: { name: KAYTOO_OS_DETECTOR_NAME } }, + size: 10, + })).body, + ); + detectorIds = pickEgressDetectors(relist, opts.indexPattern, opts.srcIpField, opts.bytesField); + } + } + + if (detectorIds.length === 0) { + return { ok: false, hasScopedSources: false, warning: 'OpenSearch AD: no detector id after ensure step.' }; + } + + for (const id of detectorIds) { + const start = await osTransport(opts.client, 'POST', `/_plugins/_anomaly_detection/detectors/${encodeURIComponent(id)}/_start`); + if (start.statusCode && start.statusCode >= 400) { + log.debug({ detectorId: id, statusCode: start.statusCode, body: start.body }, 'OpenSearch AD start (may already be running)'); + } + } + + return { ok: true, hasScopedSources: true, opensearch: { detectorIds } }; + } catch (e) { + log.warn({ ...logErr(e) }, 'OpenSearch AD pipeline ensure failed'); + return { ok: false, hasScopedSources: false, warning: 'OpenSearch AD pipeline ensure threw.' }; + } +} diff --git a/test/elasticsearch.mlAnomalyLifecycle.test.ts b/test/elasticsearch.mlAnomalyLifecycle.test.ts new file mode 100644 index 0000000..7440fbd --- /dev/null +++ b/test/elasticsearch.mlAnomalyLifecycle.test.ts @@ -0,0 +1,221 @@ +import { describe, expect, it, vi } from 'vitest'; +import { mlJobMatchesEgressShape, ensureElasticsearchMlAnomalyPipeline } from '../src/elasticsearch/mlAnomalyLifecycle.js'; + +describe('mlJobMatchesEgressShape', () => { + const job = { + job_id: 'j1', + data_description: { time_field: '@timestamp' }, + analysis_config: { + detectors: [{ function: 'sum', field_name: 'flow.bytes', over_field_name: 'flow.client.ip.addr' }], + }, + datafeed_config: { indices: ['flow-*'] }, + }; + + it('matches egress population job', () => { + expect(mlJobMatchesEgressShape(job, 'flow-*', 'flow.client.ip.addr', 'flow.bytes')).toBe(true); + }); + + it('rejects wrong detector function', () => { + expect( + mlJobMatchesEgressShape( + { + ...job, + analysis_config: { + detectors: [{ function: 'mean', field_name: 'flow.bytes', over_field_name: 'flow.client.ip.addr' }], + }, + }, + 'flow-*', + 'flow.client.ip.addr', + 'flow.bytes', + ), + ).toBe(false); + }); + + it('matches when datafeed indices array is empty', () => { + expect( + mlJobMatchesEgressShape( + { + ...job, + datafeed_config: { indices: [] }, + }, + 'flow-*', + 'flow.client.ip.addr', + 'flow.bytes', + ), + ).toBe(true); + }); + + it('matches index via pattern prefix overlap', () => { + expect( + mlJobMatchesEgressShape( + { + ...job, + datafeed_config: { indices: ['flow-codex'] }, + }, + 'flow-codex-*', + 'flow.client.ip.addr', + 'flow.bytes', + ), + ).toBe(true); + }); +}); + +describe('ensureElasticsearchMlAnomalyPipeline', () => { + it('adopts existing matching job', async () => { + const ml = { + getJobs: vi.fn().mockResolvedValue({ + jobs: [ + { + job_id: 'kaytoo-flow-egress-by-src', + data_description: { time_field: '@timestamp' }, + analysis_config: { + detectors: [{ function: 'sum', field_name: 'flow.bytes', over_field_name: 'flow.client.ip.addr' }], + }, + datafeed_config: { indices: ['flow-*'] }, + }, + ], + }), + openJob: vi.fn().mockResolvedValue({}), + getDatafeeds: vi.fn().mockResolvedValue({ datafeeds: [{ datafeed_id: 'kaytoo-flow-egress-by-src-datafeed' }] }), + startDatafeed: vi.fn().mockResolvedValue({}), + putJob: vi.fn(), + putDatafeed: vi.fn(), + }; + const client = { ml } as never; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(r.elasticsearch?.jobIds).toEqual(['kaytoo-flow-egress-by-src']); + expect(ml.putJob).not.toHaveBeenCalled(); + }); + + it('returns ok when getDatafeeds fails during adopted job startup', async () => { + const ml = { + getJobs: vi.fn().mockResolvedValue({ + jobs: [ + { + job_id: 'kaytoo-flow-egress-by-src', + data_description: { time_field: '@timestamp' }, + analysis_config: { + detectors: [{ function: 'sum', field_name: 'flow.bytes', over_field_name: 'flow.client.ip.addr' }], + }, + datafeed_config: { indices: ['flow-*'] }, + }, + ], + }), + openJob: vi.fn().mockResolvedValue({}), + getDatafeeds: vi.fn().mockRejectedValue(new Error('ml getDatafeeds unavailable')), + startDatafeed: vi.fn(), + putJob: vi.fn(), + putDatafeed: vi.fn(), + }; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client: { ml } as never, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(ml.startDatafeed).not.toHaveBeenCalled(); + }); + + it('creates job and datafeed when none match', async () => { + const ml = { + getJobs: vi.fn().mockResolvedValue({ jobs: [] }), + putJob: vi.fn().mockResolvedValue({}), + putDatafeed: vi.fn().mockResolvedValue({}), + openJob: vi.fn().mockResolvedValue({}), + getDatafeeds: vi.fn().mockResolvedValue({ datafeeds: [{ datafeed_id: 'kaytoo-flow-egress-by-src-datafeed' }] }), + startDatafeed: vi.fn().mockResolvedValue({}), + }; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client: { ml } as never, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(ml.putJob).toHaveBeenCalled(); + expect(ml.putDatafeed).toHaveBeenCalled(); + }); + + it('treats putJob resource_already_exists as non-fatal', async () => { + const ml = { + getJobs: vi.fn().mockResolvedValue({ jobs: [] }), + putJob: vi.fn().mockRejectedValue(new Error('resource_already_exists_exception')), + putDatafeed: vi.fn().mockResolvedValue({}), + openJob: vi.fn().mockResolvedValue({}), + getDatafeeds: vi.fn().mockResolvedValue({ datafeeds: [{ datafeed_id: 'kaytoo-flow-egress-by-src-datafeed' }] }), + startDatafeed: vi.fn().mockResolvedValue({}), + }; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client: { ml } as never, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(ml.putDatafeed).toHaveBeenCalled(); + }); + + it('treats putDatafeed resource_already_exists as non-fatal', async () => { + const ml = { + getJobs: vi.fn().mockResolvedValue({ jobs: [] }), + putJob: vi.fn().mockResolvedValue({}), + putDatafeed: vi.fn().mockRejectedValue(new Error('resource_already_exists_exception')), + openJob: vi.fn().mockResolvedValue({}), + getDatafeeds: vi.fn().mockResolvedValue({ datafeeds: [{ datafeed_id: 'kaytoo-flow-egress-by-src-datafeed' }] }), + startDatafeed: vi.fn().mockResolvedValue({}), + }; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client: { ml } as never, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(ml.openJob).toHaveBeenCalled(); + }); + + it('returns not ok when putDatafeed fails without resource_already_exists', async () => { + const ml = { + getJobs: vi.fn().mockResolvedValue({ jobs: [] }), + putJob: vi.fn().mockResolvedValue({}), + putDatafeed: vi.fn().mockRejectedValue(new Error('forbidden')), + openJob: vi.fn(), + getDatafeeds: vi.fn(), + startDatafeed: vi.fn(), + }; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client: { ml } as never, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(false); + }); + + it('returns not ok when getJobs throws', async () => { + const ml = { + getJobs: vi.fn().mockRejectedValue(new Error('cluster block')), + }; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client: { ml } as never, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(false); + }); +}); diff --git a/test/elasticsearchDetections.test.ts b/test/elasticsearchDetections.test.ts new file mode 100644 index 0000000..061e9e7 --- /dev/null +++ b/test/elasticsearchDetections.test.ts @@ -0,0 +1,117 @@ +import { describe, expect, it, vi } from 'vitest'; +import { fetchElasticsearchMlAnomalyFindings } from '../src/insights/elasticsearchDetections.js'; + +describe('fetchElasticsearchMlAnomalyFindings', () => { + it('returns healthyEmpty false when jobIds empty', async () => { + const r = await fetchElasticsearchMlAnomalyFindings({ + client: {} as never, + jobIds: [], + now: new Date(), + minutesBack: 10, + }); + expect(r.findings).toEqual([]); + expect(r.healthyEmpty).toBe(false); + }); + + it('maps ML records to findings', async () => { + const ml = { + getRecords: vi.fn().mockResolvedValue({ + records: [ + { + record_score: 95, + initial_record_score: 0, + over_field_value: '10.0.0.1', + timestamp: 1_700_000_000_000, + }, + ], + }), + }; + const r = await fetchElasticsearchMlAnomalyFindings({ + client: { ml } as never, + jobIds: ['job-a'], + now: new Date('2024-01-15T12:00:00Z'), + minutesBack: 60, + }); + expect(r.ok).toBe(true); + expect(r.findings).toHaveLength(1); + expect(r.findings[0]!.kind).toBe('elasticsearch_ml_anomaly'); + expect(r.findings[0]!.severity).toBe('high'); + expect(r.findings[0]!.evidence['contributingSrcIps']).toEqual(['10.0.0.1']); + }); + + it('returns ok false when getRecords throws', async () => { + const ml = { getRecords: vi.fn().mockRejectedValue(new Error('ml down')) }; + const r = await fetchElasticsearchMlAnomalyFindings({ + client: { ml } as never, + jobIds: ['j'], + now: new Date(), + minutesBack: 5, + }); + expect(r.ok).toBe(false); + expect(r.warning).toMatch(/failed/i); + }); + + it('maps medium and low severity from score', async () => { + const ml = { + getRecords: vi.fn().mockResolvedValue({ + records: [ + { record_score: 60, over_field_value: '10.0.0.2', timestamp: '2024-01-01T00:00:00Z' }, + { record_score: 10, over_field_value: '10.0.0.3', timestamp: '2024-01-01T00:00:00Z' }, + ], + }), + }; + const r = await fetchElasticsearchMlAnomalyFindings({ + client: { ml } as never, + jobIds: ['j'], + now: new Date('2024-01-15T12:00:00Z'), + minutesBack: 60, + }); + expect(r.findings.map((f) => f.severity)).toEqual(['medium', 'low']); + }); + + it('uses initial_record_score when record_score is zero', async () => { + const ml = { + getRecords: vi.fn().mockResolvedValue({ + records: [{ record_score: 0, initial_record_score: 88, over_field_value: '', timestamp: '' }], + }), + }; + const r = await fetchElasticsearchMlAnomalyFindings({ + client: { ml } as never, + jobIds: ['j'], + now: new Date('2024-01-15T12:00:00Z'), + minutesBack: 10, + }); + expect(r.findings[0]!.severity).toBe('medium'); + expect(r.findings[0]!.title).toMatch(/^ML anomaly \(score/); + expect(r.findings[0]!.window.from).toMatch(/^1970/); + }); + + it('skips non-object records and zero scores', async () => { + const ml = { + getRecords: vi.fn().mockResolvedValue({ + records: [null, { record_score: 0, initial_record_score: 0 }, { record_score: 5, over_field_value: '1.1.1.1' }], + }), + }; + const r = await fetchElasticsearchMlAnomalyFindings({ + client: { ml } as never, + jobIds: ['j'], + now: new Date('2024-01-15T12:00:00Z'), + minutesBack: 10, + }); + expect(r.findings).toHaveLength(1); + expect(r.healthyEmpty).toBe(false); + }); + + it('treats missing records array as empty', async () => { + const ml = { getRecords: vi.fn().mockResolvedValue({}) }; + const r = await fetchElasticsearchMlAnomalyFindings({ + client: { ml } as never, + jobIds: ['j'], + now: new Date(), + minutesBack: 5, + }); + expect(r.ok).toBe(true); + expect(r.findings).toEqual([]); + expect(r.healthyEmpty).toBe(true); + }); +}); diff --git a/test/enrichEgressEvidence.test.ts b/test/enrichEgressEvidence.test.ts index 1f890ba..54e6db6 100644 --- a/test/enrichEgressEvidence.test.ts +++ b/test/enrichEgressEvidence.test.ts @@ -31,6 +31,31 @@ describe('enrichEgressFinding', () => { expect(client.search).not.toHaveBeenCalled(); }); + it('enriches opensearch_anomaly like egress when contributingSrcIps present', async () => { + const f: Finding = { + id: 'os', + kind: 'opensearch_anomaly', + severity: 'high', + title: 't', + summary: 's', + evidence: { contributingSrcIps: ['10.0.0.1'] }, + window: { from: '2020-01-01T00:00:00.000Z', to: '2020-01-01T00:15:00.000Z' }, + }; + const client = { + search: vi.fn().mockResolvedValue({ + body: { + aggregations: { + by_dst: { buckets: [{ key: '8.8.8.8', doc_count: 1, dst_bytes: { value: 10 } }] }, + by_dport: { buckets: [] }, + }, + }, + }), + } as unknown as SearchClient; + const out = await enrichEgressFinding({ client, index: 'ix', fields, finding: f }); + expect(client.search).toHaveBeenCalled(); + expect(out.evidence['topDestinations']).toBeDefined(); + }); + it('merges aggregation results into evidence', async () => { const finding: Finding = { id: 'egress:10.0.0.1', diff --git a/test/insightEngine.poll.test.ts b/test/insightEngine.poll.test.ts index 399a670..864492f 100644 --- a/test/insightEngine.poll.test.ts +++ b/test/insightEngine.poll.test.ts @@ -8,6 +8,7 @@ import * as logger from '../src/logging/logger.js'; const eng = vi.hoisted(() => ({ fetchAlerting: vi.fn(), fetchAd: vi.fn(), + ensureNative: vi.fn(), queryTopEgressBySource: vi.fn(), queryPortscanCandidates: vi.fn(), summarizeFindings: vi.fn(), @@ -68,9 +69,14 @@ function installInsightsLoggerSpy(stub: ReturnType spy.mockRestore(); } -function resetEngMocks(opts?: { alerting?: MockDetectionFetch; ad?: MockDetectionFetch }) { +function resetEngMocks(opts?: { + alerting?: MockDetectionFetch; + ad?: MockDetectionFetch; + ensureNative?: { pipeline: { ok: boolean; hasScopedSources: boolean }; esMlClient: null }; +}) { eng.fetchAlerting.mockReset(); eng.fetchAd.mockReset(); + eng.ensureNative.mockReset(); eng.queryTopEgressBySource.mockReset(); eng.queryPortscanCandidates.mockReset(); eng.summarizeFindings.mockReset(); @@ -79,14 +85,21 @@ function resetEngMocks(opts?: { alerting?: MockDetectionFetch; ad?: MockDetectio opts?.alerting ?? { ok: true, findings: [], healthyEmpty: true }, ); eng.fetchAd.mockResolvedValue(opts?.ad ?? { ok: true, findings: [], healthyEmpty: true }); + eng.ensureNative.mockResolvedValue( + opts?.ensureNative ?? { pipeline: { ok: false, hasScopedSources: false }, esMlClient: null }, + ); eng.queryTopEgressBySource.mockResolvedValue([]); eng.queryPortscanCandidates.mockResolvedValue([]); eng.summarizeFindings.mockResolvedValue({ post: true, text: 'summary text' }); } -vi.mock('../src/insights/opensearchDetections.js', () => ({ - fetchOpenSearchAlertingFindings: (...a: unknown[]) => eng.fetchAlerting(...a) as Promise, - fetchOpenSearchAdFindings: (...a: unknown[]) => eng.fetchAd(...a) as Promise, +vi.mock('../src/insights/nativeAnomalyPipeline.js', () => ({ + ensureNativeAnomalyPipeline: (...a: unknown[]) => eng.ensureNative(...a) as Promise, +})); + +vi.mock('../src/insights/nativeDetections.js', () => ({ + fetchNativeAlertFindings: (...a: unknown[]) => eng.fetchAlerting(...a) as Promise, + fetchNativeAnomalyFindings: (...a: unknown[]) => eng.fetchAd(...a) as Promise, })); vi.mock('../src/opensearch/queries/index.js', () => ({ @@ -177,7 +190,11 @@ describe('startInsightEngine', () => { vi.useRealTimers(); }); - it('skips heuristics when OpenSearch alerting and AD are healthy empty', async () => { + it('skips heuristics when alerting and AD are healthy empty and native pipeline is ready', async () => { + eng.ensureNative.mockResolvedValueOnce({ + pipeline: { ok: true, hasScopedSources: true }, + esMlClient: null, + }); const { startInsightEngine } = await import('../src/insights/engine.js'); const insightSink = mockInsightSink(); const { stop } = await startInsightEngine({ @@ -185,6 +202,7 @@ describe('startInsightEngine', () => { insightSink, }); expect(insightSink.postInsight).not.toHaveBeenCalled(); + expect(eng.queryTopEgressBySource).not.toHaveBeenCalled(); stop(); }); diff --git a/test/nativeAnomalyConstants.test.ts b/test/nativeAnomalyConstants.test.ts new file mode 100644 index 0000000..c0cc113 --- /dev/null +++ b/test/nativeAnomalyConstants.test.ts @@ -0,0 +1,11 @@ +import { describe, expect, it } from 'vitest'; +import { detectionIntervalMinutes } from '../src/insights/nativeAnomalyConstants.js'; + +describe('detectionIntervalMinutes', () => { + it('clamps to minimum 5 and maximum 60', () => { + expect(detectionIntervalMinutes(30)).toBe(5); + expect(detectionIntervalMinutes(120)).toBe(5); + expect(detectionIntervalMinutes(600)).toBe(10); + expect(detectionIntervalMinutes(3600)).toBe(60); + }); +}); diff --git a/test/nativeAnomalyPipeline.test.ts b/test/nativeAnomalyPipeline.test.ts new file mode 100644 index 0000000..24a7e17 --- /dev/null +++ b/test/nativeAnomalyPipeline.test.ts @@ -0,0 +1,89 @@ +import { describe, expect, it, vi } from 'vitest'; + +const elasticMocks = vi.hoisted(() => ({ + createElasticsearchMlClient: vi.fn(), + ensureElasticsearchMlAnomalyPipeline: vi.fn(), +})); + +vi.mock('../src/opensearch/adLifecycle.js', () => ({ + ensureOpenSearchAnomalyPipeline: vi.fn().mockResolvedValue({ + ok: true, + hasScopedSources: true, + opensearch: { detectorIds: ['d'] }, + }), +})); + +vi.mock('../src/elasticsearch/mlAnomalyLifecycle.js', () => elasticMocks); + +describe('ensureNativeAnomalyPipeline', () => { + it('delegates to OpenSearch lifecycle', async () => { + const { ensureNativeAnomalyPipeline } = await import('../src/insights/nativeAnomalyPipeline.js'); + const { ensureOpenSearchAnomalyPipeline } = await import('../src/opensearch/adLifecycle.js'); + const r = await ensureNativeAnomalyPipeline({ + backend: 'opensearch', + search: {} as never, + searchClient: {} as never, + indexPattern: 'x-*', + fields: { + bytesField: 'b', + srcIpField: 's', + dstIpField: 'd', + srcPortField: '1', + dstPortField: '2', + } as import('../src/opensearch/fieldCaps.js').FieldPreference, + pollIntervalSeconds: 60, + }); + expect(ensureOpenSearchAnomalyPipeline).toHaveBeenCalled(); + expect(r.esMlClient).toBeNull(); + expect(r.pipeline.opensearch?.detectorIds).toEqual(['d']); + }); + + it('delegates to Elasticsearch ML lifecycle', async () => { + elasticMocks.createElasticsearchMlClient.mockResolvedValue({ ml: {} } as never); + elasticMocks.ensureElasticsearchMlAnomalyPipeline.mockResolvedValue({ + ok: true, + hasScopedSources: true, + elasticsearch: { jobIds: ['j1'] }, + }); + const { ensureNativeAnomalyPipeline } = await import('../src/insights/nativeAnomalyPipeline.js'); + const r = await ensureNativeAnomalyPipeline({ + backend: 'elasticsearch', + search: { url: 'http://x', username: 'u', password: 'p', backend: 'elasticsearch', tlsInsecure: false, indexPattern: 'x' }, + searchClient: {} as never, + indexPattern: 'flow-*', + fields: { + bytesField: 'b', + srcIpField: 's', + dstIpField: 'd', + srcPortField: '1', + dstPortField: '2', + } as import('../src/opensearch/fieldCaps.js').FieldPreference, + pollIntervalSeconds: 120, + }); + expect(elasticMocks.createElasticsearchMlClient).toHaveBeenCalled(); + expect(elasticMocks.ensureElasticsearchMlAnomalyPipeline).toHaveBeenCalled(); + expect(r.pipeline.elasticsearch?.jobIds).toEqual(['j1']); + }); + + it('returns when Elasticsearch ML client cannot be created', async () => { + elasticMocks.createElasticsearchMlClient.mockRejectedValueOnce(new Error('refused')); + const { ensureNativeAnomalyPipeline } = await import('../src/insights/nativeAnomalyPipeline.js'); + const r = await ensureNativeAnomalyPipeline({ + backend: 'elasticsearch', + search: { url: 'http://x', username: 'u', password: 'p', backend: 'elasticsearch', tlsInsecure: false, indexPattern: 'x' }, + searchClient: {} as never, + indexPattern: 'flow-*', + fields: { + bytesField: 'b', + srcIpField: 's', + dstIpField: 'd', + srcPortField: '1', + dstPortField: '2', + } as import('../src/opensearch/fieldCaps.js').FieldPreference, + pollIntervalSeconds: 120, + }); + expect(r.esMlClient).toBeNull(); + expect(r.pipeline.ok).toBe(false); + expect(r.pipeline.warning).toMatch(/unavailable/i); + }); +}); diff --git a/test/nativeDetections.test.ts b/test/nativeDetections.test.ts new file mode 100644 index 0000000..6e512af --- /dev/null +++ b/test/nativeDetections.test.ts @@ -0,0 +1,92 @@ +import { describe, expect, it, vi } from 'vitest'; +import { + fetchNativeAlertFindings, + fetchNativeAnomalyFindings, +} from '../src/insights/nativeDetections.js'; + +describe('fetchNativeAlertFindings', () => { + it('returns empty for elasticsearch backend', async () => { + const r = await fetchNativeAlertFindings({ + backend: 'elasticsearch', + client: {} as never, + now: new Date(), + minutesBack: 5, + }); + expect(r.findings).toEqual([]); + expect(r.healthyEmpty).toBe(false); + }); + + it('delegates to OpenSearch alerting search', async () => { + const search = vi.fn().mockResolvedValue({ + body: { _shards: { total: 0 }, hits: { hits: [] } }, + }); + await fetchNativeAlertFindings({ + backend: 'opensearch', + client: { search, fieldCaps: vi.fn() } as never, + now: new Date(), + minutesBack: 5, + }); + expect(search).toHaveBeenCalled(); + }); +}); + +describe('fetchNativeAnomalyFindings', () => { + it('returns empty for elasticsearch without ML client', async () => { + const r = await fetchNativeAnomalyFindings({ + backend: 'elasticsearch', + searchClient: {} as never, + esMlClient: null, + pipeline: { ok: true, hasScopedSources: true, elasticsearch: { jobIds: ['j'] } }, + now: new Date(), + minutesBack: 5, + }); + expect(r.findings).toEqual([]); + expect(r.healthyEmpty).toBe(false); + }); + + it('delegates OpenSearch to AD search with detector filter', async () => { + const search = vi.fn().mockResolvedValue({ + body: { + _shards: { total: 1 }, + hits: { + hits: [ + { + _id: 'h1', + _index: 'ad', + _source: { anomaly_grade: 0.8, confidence: 0.5, detector_id: 'd1' }, + }, + ], + }, + }, + }); + const r = await fetchNativeAnomalyFindings({ + backend: 'opensearch', + searchClient: { search, fieldCaps: vi.fn() } as never, + esMlClient: null, + pipeline: { ok: true, hasScopedSources: true, opensearch: { detectorIds: ['d1'] } }, + now: new Date(), + minutesBack: 10, + }); + expect(search).toHaveBeenCalled(); + expect(r.findings).toHaveLength(1); + expect(r.findings[0]!.kind).toBe('opensearch_anomaly'); + }); + + it('delegates Elasticsearch to ML getRecords', async () => { + const ml = { + getRecords: vi.fn().mockResolvedValue({ + records: [{ record_score: 60, initial_record_score: 0, over_field_value: '1.1.1.1', timestamp: 1 }], + }), + }; + const r = await fetchNativeAnomalyFindings({ + backend: 'elasticsearch', + searchClient: {} as never, + esMlClient: { ml } as never, + pipeline: { ok: true, hasScopedSources: true, elasticsearch: { jobIds: ['job1'] } }, + now: new Date('2024-06-01T12:00:00Z'), + minutesBack: 30, + }); + expect(ml.getRecords).toHaveBeenCalled(); + expect(r.findings.length).toBeGreaterThan(0); + }); +}); diff --git a/test/opensearch.adLifecycle.test.ts b/test/opensearch.adLifecycle.test.ts new file mode 100644 index 0000000..91505c5 --- /dev/null +++ b/test/opensearch.adLifecycle.test.ts @@ -0,0 +1,335 @@ +import { describe, expect, it, vi } from 'vitest'; +import { detectorMatchesEgressShape, ensureOpenSearchAnomalyPipeline } from '../src/opensearch/adLifecycle.js'; + +describe('detectorMatchesEgressShape', () => { + const shaped = { + time_field: '@timestamp', + indices: ['flow-codex-*'], + category_field: ['flow.client.ip.addr'], + feature_attributes: [ + { aggregation_query: { kaytoo_sum_bytes: { sum: { field: 'flow.bytes' } } } }, + ], + }; + + it('returns true for egress-shaped detector', () => { + expect( + detectorMatchesEgressShape(shaped, 'flow-codex-*', 'flow.client.ip.addr', 'flow.bytes'), + ).toBe(true); + }); + + it('returns false when time field differs', () => { + expect(detectorMatchesEgressShape({ ...shaped, time_field: 'ts' }, 'flow-codex-*', 'flow.client.ip.addr', 'flow.bytes')).toBe( + false, + ); + }); + + it('returns false when indices do not match pattern', () => { + expect( + detectorMatchesEgressShape({ ...shaped, indices: ['other-*'] }, 'flow-codex-*', 'flow.client.ip.addr', 'flow.bytes'), + ).toBe(false); + }); + + it('returns false when category field missing', () => { + expect( + detectorMatchesEgressShape({ ...shaped, category_field: ['other'] }, 'flow-codex-*', 'flow.client.ip.addr', 'flow.bytes'), + ).toBe(false); + }); + + it('returns false when category_field is neither string nor string array', () => { + expect( + detectorMatchesEgressShape({ ...shaped, category_field: 1 as unknown as string[] }, 'flow-codex-*', 'flow.client.ip.addr', 'flow.bytes'), + ).toBe(false); + }); + + it('matches category_field when stored as a single string', () => { + const det = { + time_field: '@timestamp', + indices: ['flow-codex-*'], + category_field: 'flow.client.ip.addr', + feature_attributes: shaped.feature_attributes, + }; + expect(detectorMatchesEgressShape(det, 'flow-codex-*', 'flow.client.ip.addr', 'flow.bytes')).toBe(true); + }); + + it('matches index when pattern contains detector index without wildcard', () => { + const det = { + ...shaped, + indices: ['elastiflow-flow-codex'], + }; + expect(detectorMatchesEgressShape(det, 'elastiflow-flow-codex-*', 'flow.client.ip.addr', 'flow.bytes')).toBe(true); + }); + + it('returns false when sum bytes feature missing', () => { + expect( + detectorMatchesEgressShape( + { + ...shaped, + feature_attributes: [{ aggregation_query: { x: { avg: { field: 'flow.bytes' } } } }], + }, + 'flow-codex-*', + 'flow.client.ip.addr', + 'flow.bytes', + ), + ).toBe(false); + }); +}); + +describe('ensureOpenSearchAnomalyPipeline', () => { + function clientWithTransport(transport: ReturnType) { + return { transport: { request: transport } } as never; + } + + it('returns not ok on plugin 404', async () => { + const transport = vi.fn().mockResolvedValue({ statusCode: 404, body: {} }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(false); + expect(r.hasScopedSources).toBe(false); + }); + + it('adopts first matching detector and starts it', async () => { + const transport = vi + .fn() + .mockResolvedValueOnce({ + statusCode: 200, + body: { + hits: { + hits: [ + { + _id: 'det1', + _source: { + name: 'Kaytoo flow egress by source', + time_field: '@timestamp', + indices: ['flow-*'], + category_field: ['flow.client.ip.addr'], + feature_attributes: [ + { aggregation_query: { k: { sum: { field: 'flow.bytes' } } } }, + ], + }, + }, + ], + }, + }, + }) + .mockResolvedValue({ statusCode: 200, body: {} }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(r.hasScopedSources).toBe(true); + expect(r.opensearch?.detectorIds).toEqual(['det1']); + expect(transport).toHaveBeenCalledWith( + expect.objectContaining({ method: 'POST', path: expect.stringContaining('detectors') }), + ); + }); + + it('returns warning when AD search returns 500', async () => { + const transport = vi.fn().mockResolvedValue({ statusCode: 500, body: {} }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(false); + }); + + it('returns not ok when transport throws', async () => { + const transport = vi.fn().mockRejectedValue(new Error('network')); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(false); + expect(r.warning).toMatch(/threw/i); + }); + + it('parses detectorList array from search response', async () => { + const transport = vi + .fn() + .mockResolvedValueOnce({ + statusCode: 200, + body: { + detectorList: [ + { + id: 'dl1', + time_field: '@timestamp', + indices: ['flow-*'], + category_field: ['flow.client.ip.addr'], + feature_attributes: [{ aggregation_query: { k: { sum: { field: 'flow.bytes' } } } }], + }, + ], + }, + }) + .mockResolvedValue({ statusCode: 200, body: {} }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(r.opensearch?.detectorIds).toEqual(['dl1']); + }); + + it('re-lists when create omits _id', async () => { + const shaped = { + time_field: '@timestamp', + indices: ['flow-*'], + category_field: ['flow.client.ip.addr'], + feature_attributes: [{ aggregation_query: { k: { sum: { field: 'flow.bytes' } } } }], + }; + const transport = vi + .fn() + .mockResolvedValueOnce({ statusCode: 200, body: { hits: { hits: [] } } }) + .mockResolvedValueOnce({ statusCode: 201, body: {} }) + .mockResolvedValueOnce({ + statusCode: 200, + body: { hits: { hits: [{ _id: 'rel1', _source: { name: 'Kaytoo flow egress by source', ...shaped } }] } }, + }) + .mockResolvedValue({ statusCode: 200, body: {} }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(r.opensearch?.detectorIds).toEqual(['rel1']); + }); + + it('tolerates start returning 400', async () => { + const transport = vi + .fn() + .mockResolvedValueOnce({ + statusCode: 200, + body: { + hits: { + hits: [ + { + _id: 'det1', + _source: { + time_field: '@timestamp', + indices: ['flow-*'], + category_field: ['flow.client.ip.addr'], + feature_attributes: [{ aggregation_query: { k: { sum: { field: 'flow.bytes' } } } }], + }, + }, + ], + }, + }, + }) + .mockResolvedValueOnce({ statusCode: 400, body: {} }) + .mockResolvedValue({ statusCode: 200, body: {} }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + }); + + it('returns not ok when create fails with 400', async () => { + const transport = vi + .fn() + .mockResolvedValueOnce({ + statusCode: 200, + body: { hits: { hits: [] } }, + }) + .mockResolvedValueOnce({ statusCode: 400, body: { error: 'bad' } }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(false); + expect(r.hasScopedSources).toBe(false); + }); + + it('creates detector when none match', async () => { + const transport = vi + .fn() + .mockResolvedValueOnce({ + statusCode: 200, + body: { hits: { hits: [] } }, + }) + .mockResolvedValueOnce({ statusCode: 201, body: { _id: 'newdet' } }) + .mockResolvedValue({ statusCode: 200, body: {} }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(r.opensearch?.detectorIds).toEqual(['newdet']); + }); + + it('prefers Kaytoo-named detector when multiple match', async () => { + const shaped = { + time_field: '@timestamp', + indices: ['flow-*'], + category_field: ['flow.client.ip.addr'], + feature_attributes: [{ aggregation_query: { k: { sum: { field: 'flow.bytes' } } } }], + }; + const transport = vi + .fn() + .mockResolvedValueOnce({ + statusCode: 200, + body: { + hits: { + hits: [ + { _id: 'other', _source: { name: 'other-det', ...shaped } }, + { _id: 'kay', _source: { name: 'Kaytoo flow egress by source', ...shaped } }, + ], + }, + }, + }) + .mockResolvedValue({ statusCode: 200, body: {} }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.opensearch?.detectorIds).toEqual(['kay']); + }); + + it('returns not ok when create omits id and relist finds no detector', async () => { + const transport = vi + .fn() + .mockResolvedValueOnce({ statusCode: 200, body: { hits: { hits: [] } } }) + .mockResolvedValueOnce({ statusCode: 201, body: {} }) + .mockResolvedValueOnce({ statusCode: 200, body: { hits: { hits: [] } } }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(false); + expect(r.warning).toMatch(/no detector id/i); + }); +}); diff --git a/test/opensearchDetections.test.ts b/test/opensearchDetections.test.ts index 0e0971b..623e2b7 100644 --- a/test/opensearchDetections.test.ts +++ b/test/opensearchDetections.test.ts @@ -90,9 +90,56 @@ describe('fetchOpenSearchAlertingFindings', () => { expect(r.warning).toMatch(/not reachable/i); }); + it('parses string JSON search body for alerting', async () => { + const payload = { + _shards: { total: 1 }, + hits: { + hits: [{ _id: 's1', _index: 'ix', _source: { monitor_name: 'mon', trigger_name: 'tr' } }], + }, + }; + const search = vi.fn().mockResolvedValue({ body: JSON.stringify(payload) }); + const r = await fetchOpenSearchAlertingFindings({ + client: clientWithSearch(search) as never, + now: new Date(), + minutesBack: 1, + }); + expect(r.ok).toBe(true); + expect(r.findings).toHaveLength(1); + }); + + it('skips malformed string body and continues scan', async () => { + const search = vi + .fn() + .mockResolvedValueOnce({ body: 'not-json{' }) + .mockResolvedValueOnce({ + body: { + _shards: { total: 1 }, + hits: { hits: [{ _id: 'ok', _source: { monitor_id: 'm', trigger_name: 't' } }] }, + }, + }); + const r = await fetchOpenSearchAlertingFindings({ + client: clientWithSearch(search) as never, + now: new Date(), + minutesBack: 1, + }); + expect(r.ok).toBe(true); + expect(search).toHaveBeenCalledTimes(2); + }); }); describe('fetchOpenSearchAdFindings', () => { + it('returns healthyEmpty false when detectorIds is empty array (scoped mode)', async () => { + const search = vi.fn(); + const r = await fetchOpenSearchAdFindings({ + client: clientWithSearch(search) as never, + minutesBack: 10, + detectorIds: [], + }); + expect(r.findings).toEqual([]); + expect(r.healthyEmpty).toBe(false); + expect(search).not.toHaveBeenCalled(); + }); + it('maps anomaly grade to severity tiers', async () => { const hits = [ { _id: 'g1', _index: 'ad', _source: { anomaly_grade: 0.95, confidence: 0.9 } }, @@ -123,4 +170,100 @@ describe('fetchOpenSearchAdFindings', () => { expect(r.ok).toBe(true); expect(r.findings).toHaveLength(1); }); + + it('falls through AD patterns when first has zero shards', async () => { + const search = vi + .fn() + .mockResolvedValueOnce({ body: { _shards: { total: 0 }, hits: { hits: [] } } }) + .mockResolvedValueOnce({ + body: { + _shards: { total: 1 }, + hits: { + hits: [{ _id: 'z', _index: 'ad2', _source: { anomaly_grade: 0.8, confidence: 0.5, detector_name: 'd1' } }], + }, + }, + }); + const r = await fetchOpenSearchAdFindings({ client: clientWithSearch(search) as never, minutesBack: 10 }); + expect(r.ok).toBe(true); + expect(search).toHaveBeenCalledTimes(2); + expect(r.findings[0]!.title).toContain('d1'); + }); + + it('parses string JSON body for AD hits', async () => { + const inner = { + _shards: { total: 1 }, + hits: { hits: [{ _id: 'str', _source: { anomaly_grade: 0.85, confidence: 0.6, name: 'n1' } }] }, + }; + const search = vi.fn().mockResolvedValue({ body: JSON.stringify(inner) }); + const r = await fetchOpenSearchAdFindings({ client: clientWithSearch(search) as never, minutesBack: 10 }); + expect(r.findings).toHaveLength(1); + expect(r.findings[0]!.title).toContain('n1'); + }); + + it('includes detector_id terms filter when detectorIds provided', async () => { + const search = vi.fn().mockResolvedValue({ + body: { _shards: { total: 1 }, hits: { hits: [] } }, + }); + await fetchOpenSearchAdFindings({ + client: clientWithSearch(search) as never, + minutesBack: 7, + detectorIds: ['det-a', 'det-b'], + }); + const call = search.mock.calls[0]![0] as { body: { query: { bool: { filter: unknown[] } } } }; + expect(call.body.query.bool.filter[0]).toEqual({ terms: { detector_id: ['det-a', 'det-b'] } }); + }); + + it('truncates entity list in AD title when more than two', async () => { + const search = vi.fn().mockResolvedValue({ + body: { + _shards: { total: 1 }, + hits: { + hits: [ + { + _id: 'e1', + _source: { + anomaly_grade: 0.8, + confidence: 0.5, + detector_name: 'det', + entity: [{ value: '10.0.0.1' }, { value: '10.0.0.2' }, { value: '10.0.0.3' }], + }, + }, + ], + }, + }, + }); + const r = await fetchOpenSearchAdFindings({ client: clientWithSearch(search) as never, minutesBack: 10 }); + expect(r.findings[0]!.title).toMatch(/…/); + expect((r.findings[0]!.evidence as { contributingSrcIps?: string[] }).contributingSrcIps).toEqual([ + '10.0.0.1', + '10.0.0.2', + '10.0.0.3', + ]); + }); + + it('uses data_start_time and data_end_time for AD window when execution times absent', async () => { + const search = vi.fn().mockResolvedValue({ + body: { + _shards: { total: 1 }, + hits: { + hits: [ + { + _id: 'w1', + _source: { + anomaly_grade: 0.5, + confidence: 0.2, + data_start_time: '2020-01-01T00:00:00Z', + data_end_time: '2020-01-02T00:00:00Z', + }, + }, + ], + }, + }, + }); + const r = await fetchOpenSearchAdFindings({ client: clientWithSearch(search) as never, minutesBack: 10 }); + expect(r.findings[0]!.window).toEqual({ + from: '2020-01-01T00:00:00Z', + to: '2020-01-02T00:00:00Z', + }); + }); }); diff --git a/test/pollUtils.test.ts b/test/pollUtils.test.ts index 5f694af..47801b3 100644 --- a/test/pollUtils.test.ts +++ b/test/pollUtils.test.ts @@ -14,11 +14,12 @@ describe('shouldSkipHeuristicPoll', () => { healthyEmpty?: boolean; }; - it('is true only when both backends report healthy empty', () => { - expect(shouldSkipHeuristicPoll(r(true), r(true))).toBe(true); - expect(shouldSkipHeuristicPoll(r(true), r(false))).toBe(false); - expect(shouldSkipHeuristicPoll(r(false), r(true))).toBe(false); - expect(shouldSkipHeuristicPoll(r(false), r(false))).toBe(false); + it('is true only when both backends report healthy empty and native anomaly pipeline is ready', () => { + expect(shouldSkipHeuristicPoll(r(true), r(true), true)).toBe(true); + expect(shouldSkipHeuristicPoll(r(true), r(true), false)).toBe(false); + expect(shouldSkipHeuristicPoll(r(true), r(false), true)).toBe(false); + expect(shouldSkipHeuristicPoll(r(false), r(true), true)).toBe(false); + expect(shouldSkipHeuristicPoll(r(false), r(false), true)).toBe(false); }); it('returns false when alerting is not healthy empty', () => { @@ -26,6 +27,7 @@ describe('shouldSkipHeuristicPoll', () => { shouldSkipHeuristicPoll( { ok: false, findings: [], warning: 'x' }, { ok: true, findings: [], healthyEmpty: true }, + true, ), ).toBe(false); }); @@ -35,6 +37,7 @@ describe('shouldSkipHeuristicPoll', () => { shouldSkipHeuristicPoll( { ok: true, findings: [], healthyEmpty: true }, { ok: false, findings: [], warning: 'x' }, + true, ), ).toBe(false); }); @@ -44,6 +47,7 @@ describe('shouldSkipHeuristicPoll', () => { shouldSkipHeuristicPoll( { ok: true, findings: [] }, { ok: true, findings: [] }, + true, ), ).toBe(false); }); From f8b68231231ba21996fdb6dc0bcab02d00b0c8d5 Mon Sep 17 00:00:00 2001 From: Madison Grubb Date: Wed, 13 May 2026 23:01:17 -0400 Subject: [PATCH 2/5] fix(insights): address native anomaly review (ES errors, timestamps, logs) ## Changed - Detect ML `resource_already_exists` via `meta.body.error.type` and `body.error.type` when present, with string fallback for older clients. - Normalize ML record `timestamp` from epoch ms or ISO strings for stable finding ids and windows. - Log throttled JSON parse warnings for OpenSearch shard counts when the search body is a malformed string (same as hits path). - Debug log when multiple egress-shaped AD detectors or ML jobs match so operators see deterministic single adoption. ## Fixed - Heuristic feature match comment clarifies JSON substring check is not a full aggregation AST. --- src/elasticsearch/mlAnomalyLifecycle.ts | 50 ++++++++-- src/insights/elasticsearchDetections.ts | 14 ++- src/insights/opensearchDetections.ts | 2 +- src/opensearch/adLifecycle.ts | 19 +++- test/elasticsearch.mlAnomalyLifecycle.test.ts | 92 +++++++++++++++++++ test/elasticsearchDetections.test.ts | 2 + test/opensearch.adLifecycle.test.ts | 47 ++++++++++ 7 files changed, 210 insertions(+), 16 deletions(-) diff --git a/src/elasticsearch/mlAnomalyLifecycle.ts b/src/elasticsearch/mlAnomalyLifecycle.ts index 16cfe0c..687cb45 100644 --- a/src/elasticsearch/mlAnomalyLifecycle.ts +++ b/src/elasticsearch/mlAnomalyLifecycle.ts @@ -65,7 +65,39 @@ async function startDatafeedsForJob(client: ElasticsearchMlClient, jobId: string } } -function pickMatchingJobIds(jobs: unknown[], indexPattern: string, srcIpField: string, bytesField: string): string[] { +function elasticsearchErrorType(e: unknown): string { + if (!isRecord(e)) return ''; + const meta = e['meta']; + if (isRecord(meta)) { + const mb = meta['body']; + if (isRecord(mb)) { + const er = mb['error']; + if (isRecord(er)) { + const t = getString(er['type']); + if (t) return t; + } + } + } + const body = e['body']; + if (isRecord(body)) { + const er = body['error']; + if (isRecord(er)) return getString(er['type']); + } + return ''; +} + +function resourceAlreadyExists(e: unknown): boolean { + if (elasticsearchErrorType(e) === 'resource_already_exists_exception') return true; + return String(e).includes('resource_already_exists_exception'); +} + +function pickMatchingJobIds( + jobs: unknown[], + indexPattern: string, + srcIpField: string, + bytesField: string, + log: ReturnType, +): string[] { const matches: { id: string; kaytoo: number }[] = []; for (const j of jobs) { if (!isRecord(j)) continue; @@ -73,12 +105,16 @@ function pickMatchingJobIds(jobs: unknown[], indexPattern: string, srcIpField: s if (!id || !mlJobMatchesEgressShape(j, indexPattern, srcIpField, bytesField)) continue; matches.push({ id, kaytoo: id === KAYTOO_ES_JOB_ID ? 0 : 1 }); } + if (matches.length === 0) return []; matches.sort((a, b) => a.kaytoo - b.kaytoo || a.id.localeCompare(b.id)); - return matches.length ? [matches[0]!.id] : []; -} - -function resourceAlreadyExists(e: unknown): boolean { - return String(e).includes('resource_already_exists_exception'); + const chosen = matches[0]!.id; + if (matches.length > 1) { + log.debug( + { chosenMlJobId: chosen, otherMatchingMlJobIds: matches.slice(1).map((m) => m.id) }, + 'Multiple ML jobs matched egress shape; using deterministic tie-break.', + ); + } + return [chosen]; } export async function ensureElasticsearchMlAnomalyPipeline(opts: { @@ -92,7 +128,7 @@ export async function ensureElasticsearchMlAnomalyPipeline(opts: { try { const list = await opts.client.ml.getJobs({}); const jobs = isRecord(list) && Array.isArray(list['jobs']) ? list['jobs'] : []; - let jobIds = pickMatchingJobIds(jobs, opts.indexPattern, opts.srcIpField, opts.bytesField); + let jobIds = pickMatchingJobIds(jobs, opts.indexPattern, opts.srcIpField, opts.bytesField, log); if (jobIds.length === 0) { const span = bucketSpan(opts.pollIntervalSeconds); diff --git a/src/insights/elasticsearchDetections.ts b/src/insights/elasticsearchDetections.ts index 59ae4d9..87977f6 100644 --- a/src/insights/elasticsearchDetections.ts +++ b/src/insights/elasticsearchDetections.ts @@ -3,13 +3,21 @@ import type { ElasticsearchMlClient } from '../elasticsearch/mlAnomalyLifecycle. import type { DetectionFetchResult } from './opensearchDetections.js'; import { getNumber, getString, isRecord } from '../util/guards.js'; +function mlRecordTimestampIso(rec: Record): string { + const v = rec['timestamp']; + if (typeof v === 'number' && Number.isFinite(v)) return new Date(v).toISOString(); + const s = getString(v); + if (!s) return ''; + const ms = Date.parse(s); + return Number.isNaN(ms) ? s : new Date(ms).toISOString(); +} + function recordToFinding(jobId: string, rec: Record): Finding { const score = Math.max(getNumber(rec['record_score']), getNumber(rec['initial_record_score'])); const severity = score >= 90 ? 'high' : score >= 50 ? 'medium' : 'low'; const over = getString(rec['over_field_value']); - const ts = getString(rec['timestamp']); - const id = `es-ml:${jobId}:${ts}:${over}:${score.toFixed(2)}`; - const t = ts || new Date(0).toISOString(); + const t = mlRecordTimestampIso(rec) || new Date(0).toISOString(); + const id = `es-ml:${jobId}:${t}:${over}:${score.toFixed(2)}`; const evidence: Record = { jobId, source: rec }; if (over) evidence['contributingSrcIps'] = [over]; return { diff --git a/src/insights/opensearchDetections.ts b/src/insights/opensearchDetections.ts index 225939b..d0725f9 100644 --- a/src/insights/opensearchDetections.ts +++ b/src/insights/opensearchDetections.ts @@ -25,7 +25,7 @@ const detectionsLog = getLogger({ component: 'insights.opensearchDetections' }); function shardsTotal(body: unknown): number { const normalized = typeof body === 'string' - ? parseJsonOrNull({ raw: body, context: 'opensearch.search.body_shards' }) + ? parseJsonOrNull({ raw: body, context: 'opensearch.search.body_shards', log: detectionsLog }) : body; if (!normalized || typeof normalized !== 'object') return 0; const shards = (normalized as Record)['_shards']; diff --git a/src/opensearch/adLifecycle.ts b/src/opensearch/adLifecycle.ts index c4012fa..5bc1085 100644 --- a/src/opensearch/adLifecycle.ts +++ b/src/opensearch/adLifecycle.ts @@ -53,6 +53,7 @@ function categoryIncludes(det: Record, srcIpField: string): boo } function featureSumsBytes(det: Record, bytesField: string): boolean { + // Heuristic: not a full aggregation AST; enough to match typical Kaytoo/vendor egress detectors. const attrs = det['feature_attributes']; if (!Array.isArray(attrs)) return false; for (const a of attrs) { @@ -116,13 +117,21 @@ function pickEgressDetectors( indexPattern: string, srcIpField: string, bytesField: string, + log: ReturnType, ): string[] { const matches = detectors.filter((d) => detectorMatchesEgressShape(d.raw, indexPattern, srcIpField, bytesField)); if (matches.length === 0) return []; - const first = [...matches].sort( + const sorted = [...matches].sort( (a, b) => kaytooNameRank(a.raw) - kaytooNameRank(b.raw) || a.id.localeCompare(b.id), - )[0]!; - return [first.id]; + ); + const chosen = sorted[0]!; + if (sorted.length > 1) { + log.debug( + { chosenDetectorId: chosen.id, otherMatchingDetectorIds: sorted.slice(1).map((d) => d.id) }, + 'Multiple AD detectors matched egress shape; using deterministic tie-break.', + ); + } + return [chosen.id]; } function buildCreateDetectorBody(opts: { @@ -179,7 +188,7 @@ export async function ensureOpenSearchAnomalyPipeline(opts: { } const listed = parseDetectorList(searchRes.body); - let detectorIds = pickEgressDetectors(listed, opts.indexPattern, opts.srcIpField, opts.bytesField); + let detectorIds = pickEgressDetectors(listed, opts.indexPattern, opts.srcIpField, opts.bytesField, log); if (detectorIds.length === 0) { const createRes = await osTransport(opts.client, 'POST', '/_plugins/_anomaly_detection/detectors', buildCreateDetectorBody(opts)); @@ -201,7 +210,7 @@ export async function ensureOpenSearchAnomalyPipeline(opts: { size: 10, })).body, ); - detectorIds = pickEgressDetectors(relist, opts.indexPattern, opts.srcIpField, opts.bytesField); + detectorIds = pickEgressDetectors(relist, opts.indexPattern, opts.srcIpField, opts.bytesField, log); } } diff --git a/test/elasticsearch.mlAnomalyLifecycle.test.ts b/test/elasticsearch.mlAnomalyLifecycle.test.ts index 7440fbd..639c3a1 100644 --- a/test/elasticsearch.mlAnomalyLifecycle.test.ts +++ b/test/elasticsearch.mlAnomalyLifecycle.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it, vi } from 'vitest'; +import * as logging from '../src/logging/logger.js'; import { mlJobMatchesEgressShape, ensureElasticsearchMlAnomalyPipeline } from '../src/elasticsearch/mlAnomalyLifecycle.js'; describe('mlJobMatchesEgressShape', () => { @@ -94,6 +95,53 @@ describe('ensureElasticsearchMlAnomalyPipeline', () => { expect(ml.putJob).not.toHaveBeenCalled(); }); + it('logs debug when multiple ML jobs match egress shape', async () => { + const debug = vi.fn(); + vi.spyOn(logging, 'getLogger').mockReturnValue({ + debug, + warn: vi.fn(), + info: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + trace: vi.fn(), + child: vi.fn(), + } as never); + const jobShape = { + data_description: { time_field: '@timestamp' }, + analysis_config: { + detectors: [{ function: 'sum', field_name: 'flow.bytes', over_field_name: 'flow.client.ip.addr' }], + }, + datafeed_config: { indices: ['flow-*'] }, + }; + const ml = { + getJobs: vi.fn().mockResolvedValue({ + jobs: [ + { job_id: 'z-other', ...jobShape }, + { job_id: 'a-other', ...jobShape }, + ], + }), + openJob: vi.fn().mockResolvedValue({}), + getDatafeeds: vi.fn().mockResolvedValue({ datafeeds: [] }), + startDatafeed: vi.fn(), + putJob: vi.fn(), + putDatafeed: vi.fn(), + }; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client: { ml } as never, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(r.elasticsearch?.jobIds).toEqual(['a-other']); + expect(debug).toHaveBeenCalledWith( + expect.objectContaining({ chosenMlJobId: 'a-other', otherMatchingMlJobIds: ['z-other'] }), + expect.stringMatching(/tie-break/i), + ); + vi.restoreAllMocks(); + }); + it('returns ok when getDatafeeds fails during adopted job startup', async () => { const ml = { getJobs: vi.fn().mockResolvedValue({ @@ -146,6 +194,28 @@ describe('ensureElasticsearchMlAnomalyPipeline', () => { expect(ml.putDatafeed).toHaveBeenCalled(); }); + it('treats putJob ResponseError-shaped resource_already_exists as non-fatal', async () => { + const ml = { + getJobs: vi.fn().mockResolvedValue({ jobs: [] }), + putJob: vi.fn().mockRejectedValue({ + meta: { body: { error: { type: 'resource_already_exists_exception', reason: 'duplicate' } } }, + }), + putDatafeed: vi.fn().mockResolvedValue({}), + openJob: vi.fn().mockResolvedValue({}), + getDatafeeds: vi.fn().mockResolvedValue({ datafeeds: [{ datafeed_id: 'kaytoo-flow-egress-by-src-datafeed' }] }), + startDatafeed: vi.fn().mockResolvedValue({}), + }; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client: { ml } as never, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(ml.putDatafeed).toHaveBeenCalled(); + }); + it('treats putJob resource_already_exists as non-fatal', async () => { const ml = { getJobs: vi.fn().mockResolvedValue({ jobs: [] }), @@ -166,6 +236,28 @@ describe('ensureElasticsearchMlAnomalyPipeline', () => { expect(ml.putDatafeed).toHaveBeenCalled(); }); + it('treats putDatafeed body.error resource_already_exists as non-fatal', async () => { + const ml = { + getJobs: vi.fn().mockResolvedValue({ jobs: [] }), + putJob: vi.fn().mockResolvedValue({}), + putDatafeed: vi.fn().mockRejectedValue({ + body: { error: { type: 'resource_already_exists_exception' } }, + }), + openJob: vi.fn().mockResolvedValue({}), + getDatafeeds: vi.fn().mockResolvedValue({ datafeeds: [{ datafeed_id: 'kaytoo-flow-egress-by-src-datafeed' }] }), + startDatafeed: vi.fn().mockResolvedValue({}), + }; + const r = await ensureElasticsearchMlAnomalyPipeline({ + client: { ml } as never, + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(true); + expect(ml.openJob).toHaveBeenCalled(); + }); + it('treats putDatafeed resource_already_exists as non-fatal', async () => { const ml = { getJobs: vi.fn().mockResolvedValue({ jobs: [] }), diff --git a/test/elasticsearchDetections.test.ts b/test/elasticsearchDetections.test.ts index 061e9e7..5fcb19f 100644 --- a/test/elasticsearchDetections.test.ts +++ b/test/elasticsearchDetections.test.ts @@ -37,6 +37,8 @@ describe('fetchElasticsearchMlAnomalyFindings', () => { expect(r.findings[0]!.kind).toBe('elasticsearch_ml_anomaly'); expect(r.findings[0]!.severity).toBe('high'); expect(r.findings[0]!.evidence['contributingSrcIps']).toEqual(['10.0.0.1']); + expect(r.findings[0]!.window.from).toBe('2023-11-14T22:13:20.000Z'); + expect(r.findings[0]!.id).toContain('2023-11-14T22:13:20.000Z'); }); it('returns ok false when getRecords throws', async () => { diff --git a/test/opensearch.adLifecycle.test.ts b/test/opensearch.adLifecycle.test.ts index 91505c5..3413e00 100644 --- a/test/opensearch.adLifecycle.test.ts +++ b/test/opensearch.adLifecycle.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it, vi } from 'vitest'; +import * as logging from '../src/logging/logger.js'; import { detectorMatchesEgressShape, ensureOpenSearchAnomalyPipeline } from '../src/opensearch/adLifecycle.js'; describe('detectorMatchesEgressShape', () => { @@ -316,6 +317,52 @@ describe('ensureOpenSearchAnomalyPipeline', () => { expect(r.opensearch?.detectorIds).toEqual(['kay']); }); + it('logs debug when multiple AD detectors match egress shape', async () => { + const debug = vi.fn(); + vi.spyOn(logging, 'getLogger').mockReturnValue({ + debug, + warn: vi.fn(), + info: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + trace: vi.fn(), + child: vi.fn(), + } as never); + const shaped = { + time_field: '@timestamp', + indices: ['flow-*'], + category_field: ['flow.client.ip.addr'], + feature_attributes: [{ aggregation_query: { k: { sum: { field: 'flow.bytes' } } } }], + }; + const transport = vi + .fn() + .mockResolvedValueOnce({ + statusCode: 200, + body: { + hits: { + hits: [ + { _id: 'z-det', _source: { name: 'z', ...shaped } }, + { _id: 'a-det', _source: { name: 'a', ...shaped } }, + ], + }, + }, + }) + .mockResolvedValue({ statusCode: 200, body: {} }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.opensearch?.detectorIds).toEqual(['a-det']); + expect(debug).toHaveBeenCalledWith( + expect.objectContaining({ chosenDetectorId: 'a-det', otherMatchingDetectorIds: ['z-det'] }), + expect.stringMatching(/tie-break/i), + ); + vi.restoreAllMocks(); + }); + it('returns not ok when create omits id and relist finds no detector', async () => { const transport = vi .fn() From 250d783b757762b98714fcd8f28b56718efedcb2 Mon Sep 17 00:00:00 2001 From: Madison Grubb Date: Wed, 13 May 2026 23:03:33 -0400 Subject: [PATCH 3/5] refactor(insights): prefer const over let in native anomaly paths ## Changed - ML pipeline: extract create helper; derive jobIds without reassignment. - OpenSearch AD: extract egressDetectorIdsEnsure; single const detectorIds. - Insight engine: resolve native anomaly context via async IIFE; poll timer/inFlight held in one const object. - ML job tie-break: sort a copy of matches instead of mutating the array. --- src/elasticsearch/mlAnomalyLifecycle.ts | 104 +++++++++++++----------- src/insights/engine.ts | 48 +++++------ src/opensearch/adLifecycle.ts | 73 +++++++++++------ 3 files changed, 131 insertions(+), 94 deletions(-) diff --git a/src/elasticsearch/mlAnomalyLifecycle.ts b/src/elasticsearch/mlAnomalyLifecycle.ts index 687cb45..c5a1f57 100644 --- a/src/elasticsearch/mlAnomalyLifecycle.ts +++ b/src/elasticsearch/mlAnomalyLifecycle.ts @@ -106,17 +106,67 @@ function pickMatchingJobIds( matches.push({ id, kaytoo: id === KAYTOO_ES_JOB_ID ? 0 : 1 }); } if (matches.length === 0) return []; - matches.sort((a, b) => a.kaytoo - b.kaytoo || a.id.localeCompare(b.id)); - const chosen = matches[0]!.id; - if (matches.length > 1) { + const sorted = [...matches].sort((a, b) => a.kaytoo - b.kaytoo || a.id.localeCompare(b.id)); + const chosen = sorted[0]!.id; + if (sorted.length > 1) { log.debug( - { chosenMlJobId: chosen, otherMatchingMlJobIds: matches.slice(1).map((m) => m.id) }, + { chosenMlJobId: chosen, otherMatchingMlJobIds: sorted.slice(1).map((m) => m.id) }, 'Multiple ML jobs matched egress shape; using deterministic tie-break.', ); } return [chosen]; } +async function createKaytooMlJobAndDatafeed( + opts: { + client: ElasticsearchMlClient; + indexPattern: string; + srcIpField: string; + bytesField: string; + pollIntervalSeconds: number; + }, + log: ReturnType, +): Promise { + const span = bucketSpan(opts.pollIntervalSeconds); + try { + await opts.client.ml.putJob({ + job_id: KAYTOO_ES_JOB_ID, + description: 'Kaytoo-managed flow egress — sum bytes by source IP.', + analysis_config: { + bucket_span: span, + detectors: [{ function: 'sum', field_name: opts.bytesField, over_field_name: opts.srcIpField }], + }, + data_description: { time_field: '@timestamp' }, + } as never); + } catch (e) { + if (!resourceAlreadyExists(e)) { + log.warn({ ...logErr(e) }, 'Elasticsearch ML putJob failed'); + return { + ok: false, + hasScopedSources: false, + warning: 'Could not create Kaytoo Elasticsearch ML job (ML unavailable or insufficient permissions).', + }; + } + } + + try { + await opts.client.ml.putDatafeed({ + datafeed_id: KAYTOO_ES_DATAFEED_ID, + job_id: KAYTOO_ES_JOB_ID, + indices: [opts.indexPattern], + query: { match_all: {} }, + scroll_size: 1000, + } as never); + } catch (e) { + if (!resourceAlreadyExists(e)) { + log.warn({ ...logErr(e) }, 'Elasticsearch ML putDatafeed failed'); + return { ok: false, hasScopedSources: false, warning: 'Could not create Kaytoo ML datafeed.' }; + } + } + + return null; +} + export async function ensureElasticsearchMlAnomalyPipeline(opts: { client: ElasticsearchMlClient; indexPattern: string; @@ -128,48 +178,10 @@ export async function ensureElasticsearchMlAnomalyPipeline(opts: { try { const list = await opts.client.ml.getJobs({}); const jobs = isRecord(list) && Array.isArray(list['jobs']) ? list['jobs'] : []; - let jobIds = pickMatchingJobIds(jobs, opts.indexPattern, opts.srcIpField, opts.bytesField, log); - - if (jobIds.length === 0) { - const span = bucketSpan(opts.pollIntervalSeconds); - try { - await opts.client.ml.putJob({ - job_id: KAYTOO_ES_JOB_ID, - description: 'Kaytoo-managed flow egress — sum bytes by source IP.', - analysis_config: { - bucket_span: span, - detectors: [{ function: 'sum', field_name: opts.bytesField, over_field_name: opts.srcIpField }], - }, - data_description: { time_field: '@timestamp' }, - } as never); - } catch (e) { - if (!resourceAlreadyExists(e)) { - log.warn({ ...logErr(e) }, 'Elasticsearch ML putJob failed'); - return { - ok: false, - hasScopedSources: false, - warning: 'Could not create Kaytoo Elasticsearch ML job (ML unavailable or insufficient permissions).', - }; - } - } - - try { - await opts.client.ml.putDatafeed({ - datafeed_id: KAYTOO_ES_DATAFEED_ID, - job_id: KAYTOO_ES_JOB_ID, - indices: [opts.indexPattern], - query: { match_all: {} }, - scroll_size: 1000, - } as never); - } catch (e) { - if (!resourceAlreadyExists(e)) { - log.warn({ ...logErr(e) }, 'Elasticsearch ML putDatafeed failed'); - return { ok: false, hasScopedSources: false, warning: 'Could not create Kaytoo ML datafeed.' }; - } - } - - jobIds = [KAYTOO_ES_JOB_ID]; - } + const adopted = pickMatchingJobIds(jobs, opts.indexPattern, opts.srcIpField, opts.bytesField, log); + const createFailed = adopted.length === 0 ? await createKaytooMlJobAndDatafeed(opts, log) : null; + if (createFailed) return createFailed; + const jobIds = adopted.length > 0 ? adopted : [KAYTOO_ES_JOB_ID]; for (const jid of jobIds) { try { diff --git a/src/insights/engine.ts b/src/insights/engine.ts index 1eede48..5cae377 100644 --- a/src/insights/engine.ts +++ b/src/insights/engine.ts @@ -57,22 +57,25 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi 'resolved opensearch field mapping', ); - let nativePipeline: NativeAnomalyPipelineResult = { ok: false, hasScopedSources: false }; - let esMlClient: ElasticsearchMlClient | null = null; - try { - const ensured = await ensureNativeAnomalyPipeline({ - backend: config.search.backend, - search: config.search, - searchClient: client, - indexPattern: config.search.indexPattern, - fields, - pollIntervalSeconds: config.behavior.pollIntervalSeconds, - }); - nativePipeline = ensured.pipeline; - esMlClient = ensured.esMlClient; - } catch (e) { - log.warn({ ...logErr(e) }, 'native anomaly pipeline ensure failed; continuing without scoped native anomaly'); - } + const nativeAnomaly = await (async (): Promise<{ + pipeline: NativeAnomalyPipelineResult; + esMlClient: ElasticsearchMlClient | null; + }> => { + try { + return await ensureNativeAnomalyPipeline({ + backend: config.search.backend, + search: config.search, + searchClient: client, + indexPattern: config.search.indexPattern, + fields, + pollIntervalSeconds: config.behavior.pollIntervalSeconds, + }); + } catch (e) { + log.warn({ ...logErr(e) }, 'native anomaly pipeline ensure failed; continuing without scoped native anomaly'); + return { pipeline: { ok: false, hasScopedSources: false }, esMlClient: null }; + } + })(); + const { pipeline: nativePipeline, esMlClient } = nativeAnomaly; const llm = createOpenAiCompatClient({ ...config.llm, @@ -80,19 +83,18 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi }); const dedupe = new DedupeStore(config.behavior.dedupeTtlSeconds * 1000); const shouldWarnDegraded = createThrottle(10 * 60_000); - let timer: NodeJS.Timeout | undefined; - let inFlight = false; + const pollLoop = { timer: undefined as NodeJS.Timeout | undefined, inFlight: false }; const nativePipelineReady = nativePipeline.ok === true && nativePipeline.hasScopedSources === true; const scheduleNext = (): void => { if (controller.signal.aborted) return; - timer = setTimeout(() => void pollOnce(), config.behavior.pollIntervalSeconds * 1000); + pollLoop.timer = setTimeout(() => void pollOnce(), config.behavior.pollIntervalSeconds * 1000); }; async function pollOnce(): Promise { - if (inFlight) return; - inFlight = true; + if (pollLoop.inFlight) return; + pollLoop.inFlight = true; return runWithLogContextAsync({ pollId: randomUUID() }, async () => { try { if (controller.signal.aborted) return; @@ -210,7 +212,7 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi } catch (e) { log.error({ ...logErr(e) }, 'poll failed'); } finally { - inFlight = false; + pollLoop.inFlight = false; scheduleNext(); } }); @@ -258,7 +260,7 @@ export async function startInsightEngine(opts: { config: KaytooConfig; insightSi return { stop: () => { controller.abort(); - if (timer) clearTimeout(timer); + if (pollLoop.timer) clearTimeout(pollLoop.timer); }, }; } diff --git a/src/opensearch/adLifecycle.ts b/src/opensearch/adLifecycle.ts index 5bc1085..aa70a9a 100644 --- a/src/opensearch/adLifecycle.ts +++ b/src/opensearch/adLifecycle.ts @@ -163,6 +163,51 @@ function buildCreateDetectorBody(opts: { }; } +type EgressDetectorEnsure = { ok: true; detectorIds: string[] } | { ok: false; result: NativeAnomalyPipelineResult }; + +async function egressDetectorIdsEnsure( + opts: { + client: SearchClient; + indexPattern: string; + srcIpField: string; + bytesField: string; + pollIntervalSeconds: number; + }, + listed: Array<{ id: string; raw: Record }>, + log: ReturnType, +): Promise { + const adopted = pickEgressDetectors(listed, opts.indexPattern, opts.srcIpField, opts.bytesField, log); + if (adopted.length > 0) return { ok: true, detectorIds: adopted }; + + const createRes = await osTransport(opts.client, 'POST', '/_plugins/_anomaly_detection/detectors', buildCreateDetectorBody(opts)); + if (createRes.statusCode && createRes.statusCode >= 400) { + log.warn({ statusCode: createRes.statusCode, body: createRes.body }, 'OpenSearch AD create detector failed'); + return { + ok: false, + result: { + ok: false, + hasScopedSources: false, + warning: 'Could not create Kaytoo OpenSearch anomaly detector (insufficient permissions or validation error).', + }, + }; + } + const created = isRecord(createRes.body) ? createRes.body : {}; + const newId = getString(created['_id']); + if (newId) return { ok: true, detectorIds: [newId] }; + + const relistBody = ( + await osTransport(opts.client, 'POST', '/_plugins/_anomaly_detection/detectors/_search', { + query: { term: { name: KAYTOO_OS_DETECTOR_NAME } }, + size: 10, + }) + ).body; + const relist = parseDetectorList(relistBody); + return { + ok: true, + detectorIds: pickEgressDetectors(relist, opts.indexPattern, opts.srcIpField, opts.bytesField, log), + }; +} + export async function ensureOpenSearchAnomalyPipeline(opts: { client: SearchClient; indexPattern: string; @@ -188,31 +233,9 @@ export async function ensureOpenSearchAnomalyPipeline(opts: { } const listed = parseDetectorList(searchRes.body); - let detectorIds = pickEgressDetectors(listed, opts.indexPattern, opts.srcIpField, opts.bytesField, log); - - if (detectorIds.length === 0) { - const createRes = await osTransport(opts.client, 'POST', '/_plugins/_anomaly_detection/detectors', buildCreateDetectorBody(opts)); - if (createRes.statusCode && createRes.statusCode >= 400) { - log.warn({ statusCode: createRes.statusCode, body: createRes.body }, 'OpenSearch AD create detector failed'); - return { - ok: false, - hasScopedSources: false, - warning: 'Could not create Kaytoo OpenSearch anomaly detector (insufficient permissions or validation error).', - }; - } - const created = isRecord(createRes.body) ? createRes.body : {}; - const newId = getString(created['_id']); - if (newId) detectorIds = [newId]; - else { - const relist = parseDetectorList( - (await osTransport(opts.client, 'POST', '/_plugins/_anomaly_detection/detectors/_search', { - query: { term: { name: KAYTOO_OS_DETECTOR_NAME } }, - size: 10, - })).body, - ); - detectorIds = pickEgressDetectors(relist, opts.indexPattern, opts.srcIpField, opts.bytesField, log); - } - } + const ensured = await egressDetectorIdsEnsure(opts, listed, log); + if (!ensured.ok) return ensured.result; + const { detectorIds } = ensured; if (detectorIds.length === 0) { return { ok: false, hasScopedSources: false, warning: 'OpenSearch AD: no detector id after ensure step.' }; From f4827107cabc597fdbb70be6b9fb85b28a64e14d Mon Sep 17 00:00:00 2001 From: Madison Grubb Date: Thu, 14 May 2026 09:22:16 -0400 Subject: [PATCH 4/5] fix(insights): ML timestamp seconds heuristic; lock AD relist tests ## Changed - Numeric ML record timestamps: assume ms from API; if value is below 1e12, treat as epoch seconds before converting to ISO. ## Fixed - OpenSearch AD relist failure path: assert three transport calls for empty relist; add case where relist returns hits that fail egress match. --- src/insights/elasticsearchDetections.ts | 6 +++++- test/elasticsearchDetections.test.ts | 15 +++++++++++++ test/opensearch.adLifecycle.test.ts | 28 +++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/insights/elasticsearchDetections.ts b/src/insights/elasticsearchDetections.ts index 87977f6..33bbb1a 100644 --- a/src/insights/elasticsearchDetections.ts +++ b/src/insights/elasticsearchDetections.ts @@ -5,7 +5,11 @@ import { getNumber, getString, isRecord } from '../util/guards.js'; function mlRecordTimestampIso(rec: Record): string { const v = rec['timestamp']; - if (typeof v === 'number' && Number.isFinite(v)) return new Date(v).toISOString(); + if (typeof v === 'number' && Number.isFinite(v)) { + // ML APIs normally return epoch ms; values below ~1e12 are treated as seconds. + const ms = v < 1e12 ? v * 1000 : v; + return new Date(ms).toISOString(); + } const s = getString(v); if (!s) return ''; const ms = Date.parse(s); diff --git a/test/elasticsearchDetections.test.ts b/test/elasticsearchDetections.test.ts index 5fcb19f..a5d9a7a 100644 --- a/test/elasticsearchDetections.test.ts +++ b/test/elasticsearchDetections.test.ts @@ -41,6 +41,21 @@ describe('fetchElasticsearchMlAnomalyFindings', () => { expect(r.findings[0]!.id).toContain('2023-11-14T22:13:20.000Z'); }); + it('treats small numeric timestamp as epoch seconds', async () => { + const ml = { + getRecords: vi.fn().mockResolvedValue({ + records: [{ record_score: 50, over_field_value: '10.0.0.1', timestamp: 1_700_000_000 }], + }), + }; + const r = await fetchElasticsearchMlAnomalyFindings({ + client: { ml } as never, + jobIds: ['j'], + now: new Date('2024-01-15T12:00:00Z'), + minutesBack: 60, + }); + expect(r.findings[0]!.window.from).toBe('2023-11-14T22:13:20.000Z'); + }); + it('returns ok false when getRecords throws', async () => { const ml = { getRecords: vi.fn().mockRejectedValue(new Error('ml down')) }; const r = await fetchElasticsearchMlAnomalyFindings({ diff --git a/test/opensearch.adLifecycle.test.ts b/test/opensearch.adLifecycle.test.ts index 3413e00..7cddb26 100644 --- a/test/opensearch.adLifecycle.test.ts +++ b/test/opensearch.adLifecycle.test.ts @@ -378,5 +378,33 @@ describe('ensureOpenSearchAnomalyPipeline', () => { }); expect(r.ok).toBe(false); expect(r.warning).toMatch(/no detector id/i); + expect(transport).toHaveBeenCalledTimes(3); + }); + + it('returns not ok when create omits id and relist has no egress-shaped detectors', async () => { + const shapedWrongIndex = { + name: 'Kaytoo flow egress by source', + time_field: '@timestamp', + indices: ['other-index-*'], + category_field: ['flow.client.ip.addr'], + feature_attributes: [{ aggregation_query: { k: { sum: { field: 'flow.bytes' } } } }], + }; + const transport = vi + .fn() + .mockResolvedValueOnce({ statusCode: 200, body: { hits: { hits: [] } } }) + .mockResolvedValueOnce({ statusCode: 201, body: {} }) + .mockResolvedValueOnce({ + statusCode: 200, + body: { hits: { hits: [{ _id: 'rel-orphan', _source: shapedWrongIndex }] } }, + }); + const r = await ensureOpenSearchAnomalyPipeline({ + client: clientWithTransport(transport), + indexPattern: 'flow-*', + srcIpField: 'flow.client.ip.addr', + bytesField: 'flow.bytes', + pollIntervalSeconds: 300, + }); + expect(r.ok).toBe(false); + expect(r.warning).toMatch(/no detector id/i); }); }); From ebd3d146790b7b3e6378039370660d507f1038d6 Mon Sep 17 00:00:00 2001 From: Madison Grubb Date: Thu, 14 May 2026 09:29:57 -0400 Subject: [PATCH 5/5] test(e2e): assert Kaytoo AD detector in verify; note console anomaly logs ## Added - verify: second AD list call requires Kaytoo egress detector name in JSON (confirms native seed/adopt against OpenSearch). - verify: optional log line when Anomaly appears (graded AD in console path). ## Changed - e2e README: document the new native AD assertion and optional Anomaly hint. --- e2e/README.md | 2 +- e2e/cli.sh | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/e2e/README.md b/e2e/README.md index b07d138..18d5083 100644 --- a/e2e/README.md +++ b/e2e/README.md @@ -84,7 +84,7 @@ export KUBECONFIG="$(pwd)/e2e/.generated/kubeconfig-kind-kaytoo-e2e" npm run e2e:verify ``` -`verify` starts a temporary chat port-forward if needed. +`verify` starts a temporary chat port-forward if needed. It also checks **OpenSearch Anomaly Detection** (`POST .../_plugins/_anomaly_detection/detectors/_search` returns 200), asserts the **Kaytoo flow egress** detector appears in that listing (native seed/adopt), confirms Kaytoo did **not** log AD plugin `404`, waits one poll window, then requires one of: structured log **`insight_post`** (console sink), **`posted findings`**, or **`skipping heuristic detectors`** (native pipeline idle + healthy empty). With `kaytooOutput: console` in e2e values, proactive text appears as JSON logs (`insightText` on `insight_post`), not Slack/Matrix. If logs contain **`Anomaly`**, verify prints an optional note (typical when a graded `opensearch_anomaly` finding was summarized). ## Host Kaytoo (optional) diff --git a/e2e/cli.sh b/e2e/cli.sh index f486097..d5bfab7 100755 --- a/e2e/cli.sh +++ b/e2e/cli.sh @@ -272,6 +272,47 @@ cmd_verify() { e2e_log "expect topTalkersByBytes in logs" logs_grep 'agent tool finished' || { kubectl -n "$ns" logs "$pod" -c kaytoo --tail=120; exit 1; } logs_grep '"tool":"topTalkersByBytes"' || { kubectl -n "$ns" logs "$pod" -c kaytoo --tail=120; exit 1; } + + e2e_log "OpenSearch Anomaly Detection API (expect HTTP 200, not 404)" + local ad_http os_base="${OS_URL%/}" + ad_http="$(curl -k -sS -o /dev/null -w '%{http_code}' -u "${OS_USER}:${OS_PASS}" \ + -X POST "${os_base}/_plugins/_anomaly_detection/detectors/_search" \ + -H 'Content-Type: application/json' \ + -d '{"query":{"match_all":{}},"size":1}' 2>/dev/null || echo "000")" + [[ "$ad_http" == "200" ]] || e2e_die "AD detectors/_search returned HTTP ${ad_http} (plugin missing or auth?)" + + e2e_log "OpenSearch AD: expect Kaytoo egress detector in list (seed/adopt)" + local ad_json + ad_json="$(curl -k -sS -u "${OS_USER}:${OS_PASS}" \ + -X POST "${os_base}/_plugins/_anomaly_detection/detectors/_search" \ + -H 'Content-Type: application/json' \ + -d '{"query":{"match_all":{}},"size":50}' 2>/dev/null || true)" + echo "$ad_json" | grep -q 'Kaytoo flow egress' || + e2e_die "AD detector list missing Kaytoo egress detector (native pipeline seed/adopt)" + + e2e_log "Kaytoo logs: native AD must not report plugin 404" + if logs_grep 'OpenSearch Anomaly Detection plugin not available (404)' 25000; then + kubectl -n "$ns" logs "$pod" -c kaytoo --tail=200 >&2 + e2e_die "Kaytoo logged AD plugin unavailable (404)" + fi + + e2e_log "wait for scheduled insight poll (e2e uses pollIntervalSeconds=15)" + sleep 20 + + e2e_log "Kaytoo logs: console insight path (insight_post, posted findings, or heuristic skip when native idle)" + if logs_grep 'insight_post' 25000 || logs_grep '"msg":"posted findings"' 25000 || logs_grep 'skipping heuristic detectors' 25000; then + e2e_log "insight engine activity ok (console insight_post, posted findings, or native-idle heuristic skip)" + else + kubectl -n "$ns" logs "$pod" -c kaytoo --tail=200 >&2 + e2e_die "no insight_post, posted findings, or heuristic-skip in recent logs (see tail above)" + fi + + if logs_grep 'Anomaly' 25000; then + e2e_log "optional: Anomaly wording in logs (likely opensearch_anomaly insight path)" + else + e2e_log "optional: no Anomaly string in recent logs (normal if no graded AD hits this window)" + fi + e2e_log "OK verify" }