Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 170 additions & 36 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ import type {
import { Asset } from "@stellar/stellar-sdk";
import { rolloverInvoice as _rolloverInvoice } from "./invoiceRollover.js";
import { BatchedRpcClient } from "./requestBatcher.js";
import { TimeoutManager, withTimeout } from "./timeout.js";
import type { TimeoutConfig } from "./timeout.js";
import { RequestTimeoutError } from "./errors.js";
import { TraceIdManager } from "./traceId.js";
import type { RpcClient } from "./rpcClient.js";

/** A plugin that extends StellarSplitClient with new methods and lifecycle hooks. */
export interface StellarSplitPlugin {
Expand Down Expand Up @@ -292,6 +297,24 @@ export interface StellarSplitClientConfig {
* selector. Use one or the other, not both.
*/
rpcPoolSize?: number;
/**
* Optional per-method timeout configuration (milliseconds).
* Pass a number to set a single default for all methods, or an object
* where keys are method names and values are timeout durations.
* The special key "default" applies to any method not explicitly listed.
* Defaults to 10 000 ms when omitted.
*
* @example
* { default: 10000, getLeaderboard: 30000, getInvoiceHistory: 20000 }
*/
timeout?: TimeoutConfig;
/**
* Optional injectable RpcClient implementation.
* When provided, all Soroban RPC calls are routed through this client
* instead of the default SorobanRpc.Server. Useful for testing (pass
* a MockRpcClient) or alternative transport environments.
*/
rpcClient?: RpcClient;
}

/** Network configuration. */
Expand Down Expand Up @@ -413,9 +436,14 @@ export class StellarSplitClient {
private _effectiveRpcPoolSize = 0;
private _batcher: BatchedRpcClient | null = null;
private _telemetryHookManager = new TelemetryHookManager();
private _timeoutManager: TimeoutManager | null = null;
private _traceIdManager = new TraceIdManager();
private _injectedRpcClient: RpcClient | null = null;

private get server(): SorobanRpc.Server {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private get server(): any {
return (
this._injectedRpcClient ??
this._rpcClient ??
this._standby?.server ??
this._pool?.select() ??
Expand All @@ -424,6 +452,7 @@ export class StellarSplitClient {
}
private set server(s: SorobanRpc.Server) {
this._rpcClient = null;
this._injectedRpcClient = null;
this._mainServer = s;
}

Expand Down Expand Up @@ -496,8 +525,16 @@ export class StellarSplitClient {
validateOrThrow(config);
this.config = config;
const primaryUrl = Array.isArray(config.rpcUrl) ? config.rpcUrl[0]! : config.rpcUrl;

// Injectable RpcClient (Issue #3): config.rpcClient takes priority over DI container.
this._injectedRpcClient = config.rpcClient ?? null;
this._rpcClient = config.container?.getRPCClient() ?? null;
this._adapter = config.container?.getWalletAdapter() ?? config.adapter ?? null;

// Per-method timeout manager (Issue #1)
if (config.timeout !== undefined) {
this._timeoutManager = new TimeoutManager(config.timeout);
}
this._mainServer = new SorobanRpc.Server(primaryUrl, {
allowHttp: primaryUrl.startsWith("http://"),
});
Expand Down Expand Up @@ -629,51 +666,68 @@ export class StellarSplitClient {
}

/**
* Wraps an async operation with telemetry hooks (onCallStart, onCallEnd, onError).
* Wraps an async operation with telemetry hooks (onCallStart, onCallEnd, onError)
* and propagates a traceId through the call stack.
* Fire-and-forget semantics: hook errors do not propagate to the caller.
*/
private async _withTelemetry<T>(
method: string,
args: Record<string, unknown> | undefined,
operation: () => Promise<T>
operation: () => Promise<T>,
opts?: { traceId?: string; timeout?: number }
): Promise<T> {
const traceId = opts?.traceId ?? this._traceIdManager.generate();
const startTime = Date.now();
this._telemetryHookManager.fireOnCallStart({
method,
args,
timestamp: startTime,
traceId,
});

try {
const result = await operation();
const durationMs = Date.now() - startTime;
this._telemetryHookManager.fireOnCallEnd({
method,
durationMs,
success: true,
timestamp: Date.now(),
});
return result;
} catch (error) {
const durationMs = Date.now() - startTime;
const stellarError = error as StellarSplitError;
const run = async (): Promise<T> => {
try {
const result = await operation();
const durationMs = Date.now() - startTime;
this._telemetryHookManager.fireOnCallEnd({
method,
durationMs,
success: true,
timestamp: Date.now(),
traceId,
});
return result;
} catch (error) {
const durationMs = Date.now() - startTime;
const stellarError = error as StellarSplitError;

this._telemetryHookManager.fireOnError(stellarError, {
method,
args,
timestamp: Date.now(),
});
this._telemetryHookManager.fireOnError(stellarError, {
method,
args,
timestamp: Date.now(),
traceId,
});

this._telemetryHookManager.fireOnCallEnd({
method,
durationMs,
success: false,
error: stellarError,
timestamp: Date.now(),
});
this._telemetryHookManager.fireOnCallEnd({
method,
durationMs,
success: false,
error: stellarError,
timestamp: Date.now(),
traceId,
});

throw error;
throw error;
}
};

const timeoutMs =
opts?.timeout ?? this._timeoutManager?.resolveTimeout(method);

if (timeoutMs !== undefined) {
return withTimeout(() => run(), timeoutMs, method);
}
return run();
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -748,6 +802,34 @@ export class StellarSplitClient {
this._telemetryHookManager.clearHooks();
}

// ---------------------------------------------------------------------------
// Timeout config (Issue #1)
// ---------------------------------------------------------------------------

/**
* Returns the resolved timeout (in ms) for each known SDK method.
* Reflects both the `default` timeout and any per-method overrides.
* Returns an empty object when no `timeout` option was set at construction.
*/
getTimeoutConfig(): Record<string, number> {
return this._timeoutManager?.getTimeoutConfig() ?? {};
}

// ---------------------------------------------------------------------------
// Trace ID (Issue #2)
// ---------------------------------------------------------------------------

/**
* Replace the default UUID v4 generator with a custom function.
* Useful for integrating OpenTelemetry span IDs or other systems.
*
* @example
* sdk.setDefaultTraceIdGenerator(() => opentelemetry.trace.getActiveSpan()?.spanContext().traceId ?? crypto.randomUUID());
*/
setDefaultTraceIdGenerator(generator: () => string): void {
this._traceIdManager.setGenerator(generator);
}

// ---------------------------------------------------------------------------
// Dispute management
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1174,12 +1256,12 @@ export class StellarSplitClient {
*/
async getInvoice(
invoiceId: string,
opts?: { retry?: PerMethodRetryOptions }
opts?: { retry?: PerMethodRetryOptions; traceId?: string; timeout?: number }
): Promise<Invoice> {
return this._withCache("getInvoice", [invoiceId], async () => {
const fetcher = this._batcher
? () => this._batcher!.getInvoice(invoiceId)
: () => this._fetchInvoice(invoiceId);
: () => this._fetchInvoice(invoiceId, opts?.traceId);

const effectiveRetry = opts?.retry ?? (this._retryOptions ? {} : undefined);
if (this._retryOptions && effectiveRetry !== undefined) {
Expand All @@ -1193,9 +1275,9 @@ export class StellarSplitClient {
});
}

private async _fetchInvoice(invoiceId: string): Promise<Invoice> {
private async _fetchInvoice(invoiceId: string, traceId?: string): Promise<Invoice> {
const startTime = Date.now();
const req = { method: "getInvoice", params: [invoiceId] };
const req = { method: "getInvoice", params: [invoiceId], headers: traceId ? { "X-Trace-Id": traceId } : undefined };
await runRequestInterceptors(req);

const fetchFn = async (): Promise<Invoice> => {
Expand Down Expand Up @@ -2952,7 +3034,7 @@ export class StellarSplitClient {
return this._fetchPaymentHistory(invoiceId);
}

private async _fetchPaymentHistory(invoiceId: string): Promise<Payment[]> {
private async _fetchPaymentHistory(invoiceId: string, traceId?: string): Promise<Payment[]> {
const startTime = Date.now();
try {
const NUM_SHARDS = 8;
Expand All @@ -2966,7 +3048,7 @@ export class StellarSplitClient {
);

const shardResults = await Promise.allSettled(
operations.map((op) => this._simulateView(op))
operations.map((op) => this._simulateView(op, traceId))
);

const allPayments: Payment[] = [];
Expand Down Expand Up @@ -3370,7 +3452,7 @@ export class StellarSplitClient {
}

/** Simulate a read-only contract call and return the native-decoded result. */
private async _simulateView(operation: xdr.Operation): Promise<unknown> {
private async _simulateView(operation: xdr.Operation, traceId?: string): Promise<unknown> {
const account = await this.server.getAccount(this.config.contractId).catch(() => null);
const sourceAccount = account ?? new Account(this.config.contractId, "0");

Expand All @@ -3382,6 +3464,10 @@ export class StellarSplitClient {
.setTimeout(30)
.build();

if (traceId) {
await runRequestInterceptors({ method: "_simulateView", params: [], headers: { "X-Trace-Id": traceId } });
}

const simResult = await this.server.simulateTransaction(tx);
if (SorobanRpc.Api.isSimulationError(simResult)) {
throw new SimulationFailedError(`Simulation failed: ${simResult.error}`, "_simulateView", simResult.error);
Expand Down Expand Up @@ -4154,6 +4240,54 @@ export class StellarSplitClient {
}
}

// ---------------------------------------------------------------------------
// Leaderboard & invoice history (used in per-method timeout examples)
// ---------------------------------------------------------------------------

/**
* Fetch the top creators by invoice volume from the contract.
*
* @param opts - Optional per-call timeout and trace ID overrides.
* @returns Array of creator addresses sorted by invoice volume descending.
*/
async getLeaderboard(
opts?: { timeout?: number; traceId?: string }
): Promise<Array<{ creator: string; invoiceCount: number; totalVolume: bigint }>> {
return this._withTelemetry(
"getLeaderboard",
undefined,
async () => {
const operation = this.contract.call("get_leaderboard");
const raw = await this._simulateView(operation, opts?.traceId);
if (!Array.isArray(raw)) return [];
return (raw as Array<Record<string, unknown>>).map((entry) => ({
creator: String(entry.creator ?? ""),
invoiceCount: Number(entry.invoice_count ?? 0),
totalVolume: BigInt((entry.total_volume as string | number | bigint) ?? 0),
}));
},
opts
);
}

/**
* Fetch the full payment history for an invoice.
*
* @param invoiceId - The invoice ID.
* @param opts - Optional per-call timeout and trace ID overrides.
*/
async getInvoiceHistory(
invoiceId: string,
opts?: { timeout?: number; traceId?: string }
): Promise<Payment[]> {
return this._withTelemetry(
"getInvoiceHistory",
{ invoiceId },
() => this._fetchPaymentHistory(invoiceId, opts?.traceId),
opts
);
}

}

/** Coerce a native-decoded scalar (bigint | number | string) into a bigint, defaulting to 0n. */
Expand Down
22 changes: 22 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,28 @@ export function isChannelReconciliationError(err: unknown): err is ChannelReconc
return err instanceof ChannelReconciliationError;
}

/** Thrown when a request exceeds its configured timeout. */
export class RequestTimeoutError extends StellarSplitError {
readonly method: string;
readonly timeoutMs: number;

constructor(method: string, timeoutMs: number) {
super(
`Request timed out after ${timeoutMs}ms (method: ${method})`,
"REQUEST_TIMEOUT",
{ method, timeoutMs }
);
this.name = "RequestTimeoutError";
this.method = method;
this.timeoutMs = timeoutMs;
Object.setPrototypeOf(this, new.target.prototype);
}
}

export function isRequestTimeoutError(err: unknown): err is RequestTimeoutError {
return err instanceof RequestTimeoutError;
}

/** Thrown when too many concurrent invoice subscriptions are created. */
export class TooManySubscriptionsError extends StellarSplitError {
constructor(maxSubscriptions: number = 10) {
Expand Down
14 changes: 14 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ export {
isChannelReconciliationError,
TooManySubscriptionsError,
isTooManySubscriptionsError,
RequestTimeoutError,
isRequestTimeoutError,
} from "./errors.js";
export { getScheduledReleaseCountdown } from "./client.js";
export { verifyCompletionProof } from "./client.js";
Expand Down Expand Up @@ -246,6 +248,18 @@ export type {
} from "./types.js";
export { InvalidTransitionError } from "./types.js";

// Per-method timeout (Issue #1)
export { TimeoutManager, withTimeout, RequestTimeoutError as TimeoutError } from "./timeout.js";
export type { TimeoutConfig } from "./timeout.js";

// Trace IDs (Issue #2)
export { TraceIdManager, globalTraceIdManager } from "./traceId.js";
export type { TraceIdGenerator } from "./traceId.js";

// Injectable RpcClient (Issue #3)
export { SorobanRpcAdapter } from "./rpcClient.js";
export type { RpcClient } from "./rpcClient.js";

export { negotiateVersion, SDK_CONTRACT_VERSION } from "./version.js";
export type { VersionInfo } from "./types.js";

Expand Down
1 change: 1 addition & 0 deletions src/interceptors.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export interface RPCRequest {
method: string;
params: unknown[];
headers?: Record<string, string>;
}

export interface RPCResponse {
Expand Down
Loading