Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"lint": "echo \"(lint stub — biome/eslint TBD)\""
},
"dependencies": {
"@smooai/fetch": "^3.3.10",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/auto-instrumentations-node": "^0.50.0",
"@opentelemetry/core": "^1.30.0",
Expand Down
28 changes: 28 additions & 0 deletions packages/core/src/__tests__/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,32 @@ describe('Transport', () => {
expect(beacon).toHaveBeenCalledOnce();
expect(t._queueSize()).toBe(0);
});

it('routes the flush through the injected resilient fetcher (not global fetch)', async () => {
const fetcher = vi.fn().mockResolvedValue({ ok: true, status: 202 });
const t = new Transport({ dsn: 'https://example.com/ingest', maxBatchSize: 1, flushIntervalMs: 1000 }, { canBeacon: false, fetcher });
t.enqueue(evt('a'));
await vi.runOnlyPendingTimersAsync();
await Promise.resolve();
expect(fetcher).toHaveBeenCalledOnce();
// Global fetch must NOT be used when a fetcher is injected.
expect(fetchMock).not.toHaveBeenCalled();
const [url, init] = fetcher.mock.calls[0]!;
expect(url).toBe('https://example.com/ingest');
expect(init.method).toBe('POST');
expect(init.keepalive).toBe(true);
expect(JSON.parse(init.body).events).toHaveLength(1);
});

it('re-queues the batch when the resilient fetcher throws (network/non-2xx after retries)', async () => {
const fetcher = vi.fn().mockRejectedValue(new Error('circuit open'));
const t = new Transport({ dsn: 'https://example.com', maxBatchSize: 1, flushIntervalMs: 1000 }, { canBeacon: false, fetcher });
t.enqueue(evt('a'));
await vi.runOnlyPendingTimersAsync();
await Promise.resolve();
expect(fetcher).toHaveBeenCalled();
// The event survived for the next flush attempt — it was put back on
// the queue after the throw rather than being dropped.
expect(t._queueSize()).toBe(1);
});
});
7 changes: 7 additions & 0 deletions packages/core/src/browser/transport.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import smooFetch from '@smooai/fetch/browser';
import { Transport } from '../transport';
import type { ClientOptions } from '../types';

/**
* Build a browser-flavored Transport: keepalive fetch, `sendBeacon` on
* `pagehide`, bound to `visibilitychange` for the modern browser-lifecycle path.
*
* SMOODEV-2026: the timer/batch flush goes through `@smooai/fetch/browser`
* (retries/timeouts/circuit-breaking). The `sendBeacon` page-unload path is
* left untouched — it's a browser primitive, not a fetch, and is the only
* thing that reliably delivers during unload.
*/
export function makeBrowserTransport(opts: ClientOptions): Transport {
const adapter = {
canBeacon: typeof navigator !== 'undefined' && typeof navigator.sendBeacon === 'function',
fetcher: smooFetch,
beacon: typeof navigator !== 'undefined' && typeof navigator.sendBeacon === 'function' ? navigator.sendBeacon.bind(navigator) : undefined,
bindLifecycle: (onPageHide: () => void) => {
if (typeof window === 'undefined') return;
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/node/transport.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import smooFetch from '@smooai/fetch';
import { Transport } from '../transport';
import type { ClientOptions } from '../types';

Expand All @@ -11,10 +12,15 @@ import type { ClientOptions } from '../types';
* SMOODEV-1148: this gets registered in Node init so `Client.captureException`
* fans out to BOTH the OTel-native captureHandler (span events) AND the
* webhook POST (errorEvents table → Errors dashboard).
*
* SMOODEV-2026: the batch flush goes through `@smooai/fetch` (node entry)
* so webhook delivery gets exponential-backoff retries, timeouts, and
* circuit-breaking for free.
*/
export function makeNodeTransport(opts: ClientOptions): Transport {
const adapter = {
canBeacon: false,
fetcher: smooFetch,
};
return new Transport(
{
Expand Down
106 changes: 106 additions & 0 deletions packages/core/src/otel/__tests__/auth-injecting-exporter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { SpanKind, SpanStatusCode, TraceFlags } from '@opentelemetry/api';
import { ExportResultCode } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import type { ReadableSpan } from '@opentelemetry/sdk-trace-base';
import { describe, expect, it, vi } from 'vitest';
import type { TokenProvider } from '../../auth/token-provider';
import { AuthInjectingTraceExporter } from '../auth-injecting-exporter';

// A serializable ReadableSpan stub — JsonTraceSerializer reads name, span
// context, kind, timing, attributes, status, resource, and scope, so we
// supply all of them. The exporter then attempts a real export call.
const fakeSpan = {
name: 'test-span',
kind: SpanKind.INTERNAL,
spanContext: () => ({ traceId: '0af7651916cd43dd8448eb211c80319c', spanId: 'b7ad6b7169203331', traceFlags: TraceFlags.SAMPLED }),
parentSpanId: undefined,
startTime: [1609459200, 0] as [number, number],
endTime: [1609459200, 1_000_000] as [number, number],
status: { code: SpanStatusCode.OK },
attributes: {},
links: [],
events: [],
duration: [0, 1_000_000] as [number, number],
ended: true,
resource: Resource.empty(),
instrumentationLibrary: { name: 'test', version: '0.0.0' },
droppedAttributesCount: 0,
droppedEventsCount: 0,
droppedLinksCount: 0,
} as unknown as ReadableSpan;

function tokenProviderStub(): TokenProvider & { invalidate: ReturnType<typeof vi.fn> } {
const invalidate = vi.fn();
return {
getAccessToken: vi.fn().mockResolvedValue('tok-123'),
invalidate,
} as unknown as TokenProvider & { invalidate: ReturnType<typeof vi.fn> };
}

describe('AuthInjectingTraceExporter (transport via @smooai/fetch seam)', () => {
it('posts with a fresh Bearer token and reports SUCCESS on 2xx', async () => {
const fetcher = vi.fn().mockResolvedValue(new Response('', { status: 200 }));
const tp = tokenProviderStub();
const exporter = new AuthInjectingTraceExporter({ url: 'https://api.smoo.ai/v1/traces', tokenProvider: tp, fetcher });

const result = await new Promise<{ code: ExportResultCode }>((resolve) => {
exporter.export([fakeSpan], resolve);
});

expect(result.code).toBe(ExportResultCode.SUCCESS);
expect(fetcher).toHaveBeenCalledOnce();
const [url, init] = fetcher.mock.calls[0]!;
expect(url).toBe('https://api.smoo.ai/v1/traces');
expect(init.method).toBe('POST');
expect((init.headers as Record<string, string>).authorization).toBe('Bearer tok-123');
// Timeout is handed to @smooai/fetch via its `options` bag, not a
// hand-rolled AbortController.
expect((init as { options?: { timeout?: { timeoutMs: number } } }).options?.timeout?.timeoutMs).toBe(10_000);
expect((init as { signal?: unknown }).signal).toBeUndefined();
});

it('invalidates the token and retries once on 401, then succeeds', async () => {
const fetcher = vi
.fn()
.mockResolvedValueOnce(new Response('', { status: 401 }))
.mockResolvedValueOnce(new Response('', { status: 200 }));
const tp = tokenProviderStub();
const exporter = new AuthInjectingTraceExporter({ url: 'https://api.smoo.ai/v1/traces', tokenProvider: tp, fetcher });

const result = await new Promise<{ code: ExportResultCode }>((resolve) => {
exporter.export([fakeSpan], resolve);
});

expect(result.code).toBe(ExportResultCode.SUCCESS);
expect(tp.invalidate).toHaveBeenCalledOnce();
expect(fetcher).toHaveBeenCalledTimes(2);
});

it('reports FAILED when the response is non-ok (e.g. unwrapped from a thrown HTTPResponseError)', async () => {
// Simulates the default transport handing back the Response carried on
// a thrown @smooai/fetch HTTPResponseError after retries are exhausted.
const fetcher = vi.fn().mockResolvedValue(new Response('boom', { status: 503 }));
const tp = tokenProviderStub();
const exporter = new AuthInjectingTraceExporter({ url: 'https://api.smoo.ai/v1/traces', tokenProvider: tp, fetcher });

const result = await new Promise<{ code: ExportResultCode; error?: Error }>((resolve) => {
exporter.export([fakeSpan], resolve);
});

expect(result.code).toBe(ExportResultCode.FAILED);
expect(result.error?.message).toContain('503');
});

it('reports FAILED when the transport throws (network / circuit-open)', async () => {
const fetcher = vi.fn().mockRejectedValue(new Error('circuit open'));
const tp = tokenProviderStub();
const exporter = new AuthInjectingTraceExporter({ url: 'https://api.smoo.ai/v1/traces', tokenProvider: tp, fetcher });

const result = await new Promise<{ code: ExportResultCode; error?: Error }>((resolve) => {
exporter.export([fakeSpan], resolve);
});

expect(result.code).toBe(ExportResultCode.FAILED);
expect(result.error?.message).toContain('circuit open');
});
});
76 changes: 55 additions & 21 deletions packages/core/src/otel/auth-injecting-exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,65 @@
* SMOODEV-1206.
*/

import smooFetch, { HTTPResponseError } from '@smooai/fetch';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { JsonMetricsSerializer, JsonTraceSerializer } from '@opentelemetry/otlp-transformer';
import type { ResourceMetrics, PushMetricExporter } from '@opentelemetry/sdk-metrics';
import type { ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-base';
import type { TokenProvider } from '../auth/token-provider';

/**
* Default exporter transport: `@smooai/fetch` owns retries (exponential
* backoff + jitter), timeouts, and circuit-breaking. It *throws*
* `HTTPResponseError` on a non-2xx response after exhausting retries; we
* unwrap that back into the underlying `Response` so the caller's existing
* `status === 401` / `!res.ok` control flow (auth-refresh-once + error
* surfacing) keeps working unchanged.
*
* SMOODEV-2026: this replaces the hand-rolled `AbortController` timeout — we
* deliberately let `@smooai/fetch` own the single transient-retry/timeout
* layer rather than stacking another on top (the BatchSpanProcessor /
* PeriodicExportingMetricReader already re-export on `FAILED`, so a second
* in-exporter retry loop would compound). The 401-invalidate-and-retry-once
* below is auth refresh, not transient retry, so it stays.
*/
async function defaultExporterFetch(url: string, init: RequestInit): Promise<Response> {
try {
// `@smooai/fetch` returns a `ResponseWithBody` (a superset of Response).
return (await smooFetch(url, init)) as unknown as Response;
} catch (error) {
// Non-2xx after retries throws — hand the Response back so the
// status-based branching upstream still applies. Re-throw anything
// that isn't an HTTP response (network/timeout/circuit-open) so it
// surfaces as an export failure.
if (error instanceof HTTPResponseError) {
return error.response as unknown as Response;
}
throw error;
}
}

interface AuthInjectingExporterOptions {
/** Fully-qualified OTLP endpoint, e.g. `https://api.smoo.ai/v1/traces`. */
url: string;
/** Token provider that holds the cached access_token. Consulted per-request. */
tokenProvider: TokenProvider;
/** Static headers to merge onto every request (e.g. user-agent). */
staticHeaders?: Record<string, string>;
/** Test seam — override fetch. */
fetcher?: typeof fetch;
/** Per-request timeout in ms. Default 10s. */
/**
* Override the transport. Defaults to `@smooai/fetch` (resilient).
* Primarily a test seam.
*/
fetcher?: (url: string, init: RequestInit) => Promise<Response>;
/** Per-request timeout in ms. Default 10s. Owned by `@smooai/fetch`. */
timeoutMs?: number;
}

abstract class BaseAuthInjectingExporter<Item> {
protected readonly url: string;
protected readonly tokenProvider: TokenProvider;
protected readonly staticHeaders: Record<string, string>;
protected readonly fetcher: typeof fetch;
protected readonly fetcher: (url: string, init: RequestInit) => Promise<Response>;
protected readonly timeoutMs: number;
private shutdownRequested = false;

Expand All @@ -62,7 +97,7 @@ abstract class BaseAuthInjectingExporter<Item> {
this.url = opts.url;
this.tokenProvider = opts.tokenProvider;
this.staticHeaders = opts.staticHeaders ?? {};
this.fetcher = opts.fetcher ?? fetch;
this.fetcher = opts.fetcher ?? defaultExporterFetch;
this.timeoutMs = opts.timeoutMs ?? 10_000;
}

Expand Down Expand Up @@ -102,22 +137,21 @@ abstract class BaseAuthInjectingExporter<Item> {
const body = new TextDecoder().decode(bodyBytes);
const attempt = async (): Promise<Response> => {
const token = await this.tokenProvider.getAccessToken();
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), this.timeoutMs);
try {
return await this.fetcher(this.url, {
method: 'POST',
headers: {
...this.staticHeaders,
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body,
signal: controller.signal,
});
} finally {
clearTimeout(timer);
}
// Timeout is owned by the transport (`@smooai/fetch`) via the
// `options.timeout` passthrough below — no hand-rolled
// AbortController. The cast carries `@smooai/fetch`'s `options`
// bag on the standard `RequestInit`; a plain-`fetch` test seam
// simply ignores the extra field.
return await this.fetcher(this.url, {
method: 'POST',
headers: {
...this.staticHeaders,
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body,
options: { timeout: { timeoutMs: this.timeoutMs } },
} as RequestInit);
};

let res = await attempt();
Expand Down
27 changes: 26 additions & 1 deletion packages/core/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,34 @@ const DEFAULT_FLUSH_MS = 1000;
const DEFAULT_BATCH_SIZE = 30;
const DEFAULT_QUEUE_MAX = 250;

/**
* Minimal fetch-like contract the transport relies on for batch delivery.
*
* `@smooai/fetch` (the resilient client injected by the node/browser
* adapters) satisfies this: it accepts the same `(url, init)` shape and
* resolves to a `Response` on 2xx, while *throwing* on network errors and
* non-2xx responses after exhausting its own retries. Our `flush()` catch
* block re-queues on any throw, so both the throw-on-failure and
* resolve-on-success paths land in the right place without the transport
* caring which client is underneath. Native `fetch` also satisfies this
* (it just resolves on non-2xx instead of throwing — still handled).
*/
export type TransportFetch = (url: string, init: RequestInit) => Promise<{ ok?: boolean } | Response>;

interface TransportRuntimeAdapter {
/** Whether `navigator.sendBeacon` is available (browser). */
canBeacon: boolean;
/** Beacon implementation, if available. */
beacon?: (url: string, body: string) => boolean;
/** Bind `pagehide` so we can flush via beacon. */
bindLifecycle?: (onPageHide: () => void) => void;
/**
* Resilient fetch used for the timer/batch flush path. Supplied by the
* runtime adapter (node/browser) as the matching `@smooai/fetch` entry
* point so retries/timeouts/circuit-breaking come for free. Falls back
* to the global `fetch` when omitted (e.g. in tests).
*/
fetcher?: TransportFetch;
}

/**
Expand Down Expand Up @@ -56,7 +77,11 @@ export class Transport {
this.clearTimer();
try {
const payload: IngestPayload = { type: 'error', events: batch };
await fetch(this.opts.dsn, {
// Prefer the resilient client supplied by the runtime adapter
// (`@smooai/fetch`) — it owns retries/timeouts/circuit-breaking.
// Fall back to global fetch when no fetcher was injected (tests).
const fetcher = this.adapter.fetcher ?? ((url, init) => fetch(url, init));
await fetcher(this.opts.dsn, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(payload),
Expand Down
Loading
Loading