Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/backend/clients/event/EventClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,19 @@ export class EventClient extends PuterClient {
this.#eventListeners[key] ?? (this.#eventListeners[key] = []);
listeners.push(callback as EventListener);
}
off<P extends ListenKey>(
key: P,
callback: (
key: MatchingEvents<P>,
data: EventMap[MatchingEvents<P>],
meta: EventMetadata,
) => Promise<void> | void,
) {
const listeners = this.#eventListeners[key];
if (!listeners) return;
const idx = listeners.indexOf(callback as EventListener);
if (idx !== -1) listeners.splice(idx, 1);
}
async #emitEvent<T extends keyof EventMap>(
listener: EventListener,
key: T,
Expand Down
2 changes: 2 additions & 0 deletions src/backend/clients/event/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ export type EventMap = {
// normalized path: `route.<method>.<path>.before|after|error|reject`. Same
// wildcard + veto semantics as the driver lifecycle above.
[K in `route.${string}`]: RouteLifecycleEvent;
} & {
[K in `pubsub.login.${string}`]: { authtoken: string };
};

/**
Expand Down
59 changes: 58 additions & 1 deletion src/backend/controllers/auth/AuthController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import bcrypt from 'bcrypt';
import type { Request, RequestHandler, Response } from 'express';
import crypto from 'node:crypto';
import { v4 as uuidv4 } from 'uuid';
import { v4 as uuidv4, validate as validateUuid } from 'uuid';
import validator from 'validator';
import { Controller, Get, Post } from '../../core/http/decorators.js';
import { HttpError } from '../../core/http/HttpError.js';
Expand Down Expand Up @@ -95,6 +95,63 @@ const RESERVED_USERNAMES = new Set([
*/
@Controller('')
export class AuthController extends PuterController {
@Post('/login/wait', {
subdomain: ['api'],
})
async loginWait(req: Request, res: Response) {
const { session } = req.body;
// validate uuid to prevent ultra long key or listening on pubsub.login.*
if (!session || !validateUuid(session)) {
throw new HttpError(400, 'session is required.', {
legacyCode: 'bad_request',
});
}
const { resolve, promise } = Promise.withResolvers<void>();

let token: string | null = null;
const listener = (_key: string, value: { authtoken: string }) => {
token = value.authtoken;
resolve();
};
this.clients.event.on(`pubsub.login.${session}`, listener);

const timeout = new Promise<void>((resolve) =>
setTimeout(resolve, 10000),
);
await Promise.race([promise, timeout]);
if (!token) {
throw new HttpError(408, 'Request timeout.', {
legacyCode: 'request_timeout',
});
}
this.clients.event.off(`pubsub.login.${session}`, listener);

res.json({
auth_token: token,
});
}
@Post('/login/set', {
subdomain: ['api'],
})
async loginSet(req: Request, res: Response) {
const { session, auth_token } = req.body;
if (!session || !auth_token || !validateUuid(session)) {
throw new HttpError(400, 'session and auth_token are required.', {
legacyCode: 'bad_request',
});
}

this.clients.event.emit(
`pubsub.login.${session}`,
{
authtoken: auth_token,
},
{},
);

res.json({ success: true });
}

// -- Login -------------------------------------------------------

@Post('/login', {
Expand Down
70 changes: 65 additions & 5 deletions src/backend/services/broadcast/BroadcastService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import axios from 'axios';
import { createHmac, timingSafeEqual } from 'node:crypto';
import { createHmac, randomUUID, timingSafeEqual } from 'node:crypto';
import { Agent as HttpsAgent } from 'node:https';
import { IBroadcastPeerConfig } from '../../types.js';
import { PuterService } from '../types.js';
Expand Down Expand Up @@ -81,15 +81,14 @@ interface IncomingHeaders {
* - Inbound handler ignores POSTs whose `X-Broadcast-Peer-Id` matches
* this server's own peerId (catches misconfigured loopbacks).
*
* No Redis pub/sub here — webhooks are the only transport. Same-cluster
* fan-out is handled by sockets via the Redis streams adapter, so an
* additional Redis channel here would just duplicate work.
*/
export class BroadcastService extends PuterService {
/** peerId → resolved peer config, used for incoming-verify lookup. */
#peersByKey: Record<string, IBroadcastPeerConfig> = {};
/** Subset of peers with `webhook: true`, used for outbound fan-out. */
#webhookPeers: IBroadcastPeerConfig[] = [];
/** Identifier used to tell what server a redis fan-out is coming from. */
#redisSourceId: string = `${this.config.serverId}:${randomUUID()}`;

/** Coalesced outbound events, keyed by serialized shape. */
#outboundEventsByDedupKey = new Map<string, BroadcastEvent>();
Expand All @@ -103,12 +102,14 @@ export class BroadcastService extends PuterService {
#webhookHostHeader: string | null = null;
/** Self-signed certs are common between Puter nodes — accept them. */
#webhookHttpsAgent = new HttpsAgent({ rejectUnauthorized: false });
#redisSub: ReturnType<typeof this.clients.redis.duplicate> | null = null;

// -- Lifecycle ---------------------------------------------------

override onServerStart(): void {
this.#loadConfig();
this.#subscribeOutbound();
this.#subscribeRedisOutbound();
}

override async onServerPrepareShutdown(): Promise<void> {
Expand All @@ -123,6 +124,11 @@ export class BroadcastService extends PuterService {
} catch (err) {
console.warn('[broadcast] final flush failed', err);
}
if (this.#redisSub) {
await this.#redisSub.unsubscribe('pubsub');
this.#redisSub.quit();
this.#redisSub = null;
}
}

// -- Public API used by BroadcastController ----------------------
Expand Down Expand Up @@ -231,8 +237,62 @@ export class BroadcastService extends PuterService {
return { ok: true };
}

// -- Outbound: subscribe + queue + flush ------------------------
#pubsubFanout(key: string, data: unknown, meta: object): void {
const safeMeta = this.#normalizeMeta(meta);
if (safeMeta.from_fanout) return;
this.clients.redis.publish(
'pubsub',
JSON.stringify({
key,
data,
meta: safeMeta,
source: this.#redisSourceId,
}),
);
}

// outer.pubsub.* events will be broadcast to other clusters through webhooks
// pubsub.* will only fan-out to same-cluster nodes.
#subscribeRedisOutbound(): void {
this.#redisSub = this.clients.redis.duplicate();
this.#redisSub.subscribe('pubsub');
this.#redisSub.on('message', (channel: string, message: string) => {
if (channel !== 'pubsub') return;
const parsed = JSON.parse(message);
const { key, data, meta, source } = parsed as {
key: string;
data: unknown;
meta: object;
source: string;
};
if (source === this.#redisSourceId) return;
const safeMeta = this.#normalizeMeta(meta);

this.clients.event.emit(key, data, {
...safeMeta,
from_fanout: true,
// it's not from outside, but mark it as to prevent sending the webhook twice
from_outside: true,
});
});

this.clients.event.on(
'outer.pubsub.*',
(key: string, data: unknown, meta: object) => {
this.#pubsubFanout(key, data, meta);
},
);
this.clients.event.on(
'pubsub.*',
(key: string, data: unknown, meta: object) => {
this.#pubsubFanout(key, data, meta);
},
);
}

// -- Outbound: subscribe + queue + flush ------------------------
// outer.* events will be broadcast to other clusters through webhooks
// outer will NOT automatically sync to same-cluster peers.
#subscribeOutbound(): void {
// Wildcard: every `outer.*` event gets considered for broadcast.
// The handler skips events that came in via webhook (meta.from_outside)
Expand Down
Loading
Loading