Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3ed7b94
refactor(db): rename channel_badge_counts to badge_counts (general pu…
iscekic Apr 29, 2026
e8d062c
feat(db): migration to rename badge_counts and reset rows
iscekic Apr 29, 2026
20b9b3b
feat(notifications): add badge-bucket key builders
iscekic Apr 29, 2026
1bb97c6
chore(notifications): add EVENT_SERVICE binding, drop STREAM_CHAT_API…
iscekic Apr 29, 2026
d87c0fb
chore(notifications): add vitest scaffold
iscekic Apr 29, 2026
2a621db
feat(notifications): rewrite NotificationChannelDO around dispatchPush
iscekic Apr 29, 2026
26fccf5
chore(notifications): drop orphan badgeBucketForInstance helper
iscekic Apr 29, 2026
7fad879
feat(notifications): add sendPushForConversation WorkerEntrypoint RPC
iscekic Apr 29, 2026
f6e1848
chore(notifications): delete Stream webhook route
iscekic Apr 29, 2026
3c7c82e
chore(notifications): type EVENT_SERVICE RPC and enable cloudflare:te…
iscekic Apr 29, 2026
227b90e
feat(event-service): add kiloclaw event-context helpers; migrate kilo…
iscekic Apr 29, 2026
87f0fab
feat(kilo-chat): add fetchSandboxLabel helper
iscekic Apr 29, 2026
822d327
chore(kilo-chat): add NOTIFICATIONS service binding
iscekic Apr 29, 2026
372f0a0
feat(kilo-chat): publish push on message.created via NOTIFICATIONS RPC
iscekic Apr 29, 2026
52fe8a6
chore(notifications): drop orphan stream-chat dep, refresh worker typ…
iscekic Apr 29, 2026
4e95291
fix(notifications): named entrypoint export, retry-safe badge, alarm-…
iscekic Apr 29, 2026
4faf0dd
fix(notifications): close two cleanup-alarm leaks
iscekic Apr 29, 2026
8d7b9d7
refactor(event-service): compose presence contexts from kiloclaw helpers
iscekic Apr 29, 2026
893b7f1
feat(web): add kiloChat.getToken tRPC procedure
iscekic Apr 29, 2026
a35c98c
refactor(web): use kiloclaw-context helpers for event subscriptions
iscekic Apr 29, 2026
a43585d
feat(web): lift EventServiceClient to global provider
iscekic Apr 29, 2026
e98f370
feat(web): add usePresenceSubscription primitive
iscekic Apr 29, 2026
6bfbf95
refactor(web): collapse kilo-chat event subscriptions into usePresenc…
iscekic Apr 29, 2026
832e2b7
feat(web): subscribe to /presence/web while tab is visible
iscekic Apr 29, 2026
99b52d5
feat(web): subscribe to /presence/kiloclaw/{sandboxId} on instance views
iscekic Apr 29, 2026
bdb99c6
refactor(web): extract useDocumentVisible primitive
iscekic Apr 29, 2026
405b185
feat(web): subscribe to conversation presence while tab visible
iscekic Apr 29, 2026
4429bdf
style(web): reflow useDocumentVisible useState init to one line
iscekic Apr 29, 2026
eca983e
refactor(web): tighten presence hook + kilo-chat router contract
iscekic Apr 29, 2026
7edca1a
fix(event-service): refcount subscribe/unsubscribe by context
iscekic Apr 29, 2026
7a91f33
Merge remote-tracking branch 'origin/feat/kilo-chat-migration-pr1' in…
iscekic Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions packages/event-service/src/__tests__/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,102 @@ describe('EventServiceClient', () => {
expect(err).toBeInstanceOf(Error);
expect(err.name).toBe('HandshakeTimeoutError');
});

describe('subscribe/unsubscribe refcounting', () => {
function sentMessages() {
return lastMockWs.sent.map(s => JSON.parse(s) as unknown);
}

it('only sends one wire context.subscribe when two consumers subscribe to the same context', async () => {
const client = makeClient();
await client.connect();
lastMockWs.sent = [];

client.subscribe(['room:1']);
client.subscribe(['room:1']);

expect(sentMessages()).toEqual([{ type: 'context.subscribe', contexts: ['room:1'] }]);
});

it('keeps the subscription alive when one of two consumers unsubscribes', async () => {
const client = makeClient();
await client.connect();
lastMockWs.sent = [];

client.subscribe(['room:1']);
client.subscribe(['room:1']);
client.unsubscribe(['room:1']);

// Only the initial 0→1 subscribe should have been sent. No unsubscribe yet
// because the second consumer is still holding a ref.
expect(sentMessages()).toEqual([{ type: 'context.subscribe', contexts: ['room:1'] }]);
});

it('sends context.unsubscribe only when the last consumer drops the ref (1→0)', async () => {
const client = makeClient();
await client.connect();
lastMockWs.sent = [];

client.subscribe(['room:1']);
client.subscribe(['room:1']);
client.unsubscribe(['room:1']);
client.unsubscribe(['room:1']);

expect(sentMessages()).toEqual([
{ type: 'context.subscribe', contexts: ['room:1'] },
{ type: 'context.unsubscribe', contexts: ['room:1'] },
]);
});

it('handles a mixed batch: only newly-active contexts get sent', async () => {
const client = makeClient();
await client.connect();
client.subscribe(['room:1']);
lastMockWs.sent = [];

// room:1 already at refcount 1, room:2 is new. Only room:2 should hit the wire.
client.subscribe(['room:1', 'room:2']);

expect(sentMessages()).toEqual([{ type: 'context.subscribe', contexts: ['room:2'] }]);
});

it('extra unsubscribes for an unknown context are no-ops', async () => {
const client = makeClient();
await client.connect();
lastMockWs.sent = [];

// Never subscribed — must not crash and must not emit a wire message.
client.unsubscribe(['ghost']);

expect(sentMessages()).toEqual([]);
});

it('resubscribe-on-reconnect deduplicates by context (one entry per active context)', async () => {
vi.useFakeTimers();
const client = makeClient();
await client.connect();

// Two consumers hold the same context.
client.subscribe(['room:1']);
client.subscribe(['room:1']);

// Drop the connection — auto-reconnect kicks in.
lastMockWs.triggerClose();
await vi.advanceTimersByTimeAsync(2000);
expect(allMockWs.length).toBe(2);
// The second mock socket also auto-triggers open via the global stub.
await vi.advanceTimersByTimeAsync(0);

const resubMessages = allMockWs[1].sent
.map(s => JSON.parse(s) as { type: string; contexts?: string[] })
.filter(m => m.type === 'context.subscribe');

// Exactly one resubscribe message containing the context exactly once,
// regardless of how many consumers hold the ref.
expect(resubMessages).toHaveLength(1);
expect(resubMessages[0]?.contexts).toEqual(['room:1']);

vi.useRealTimers();
});
});
});
31 changes: 23 additions & 8 deletions packages/event-service/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ export class EventServiceClient {
private ws: WebSocket | null = null;
private connected = false;
private eventHandlers = new Map<string, Set<(context: string, payload: unknown) => void>>();
private activeContexts = new Set<string>();
// Refcounted so multiple consumers can independently subscribe to and
// unsubscribe from the same context without trampling each other. The wire
// `context.subscribe`/`context.unsubscribe` messages are only sent on the
// 0↔1 transitions; intermediate refcount churn stays client-side.
private activeContexts = new Map<string, number>();
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private destroyed = false;
private reconnectAttempts = 0;
Expand Down Expand Up @@ -219,20 +223,31 @@ export class EventServiceClient {
}

subscribe(contexts: string[]): void {
const newlyActive: string[] = [];
for (const ctx of contexts) {
this.activeContexts.add(ctx);
const next = (this.activeContexts.get(ctx) ?? 0) + 1;
this.activeContexts.set(ctx, next);
if (next === 1) newlyActive.push(ctx);
}
if (this.isConnected()) {
this.send({ type: 'context.subscribe', contexts });
if (newlyActive.length > 0 && this.isConnected()) {
this.send({ type: 'context.subscribe', contexts: newlyActive });
}
}

unsubscribe(contexts: string[]): void {
const released: string[] = [];
for (const ctx of contexts) {
this.activeContexts.delete(ctx);
const current = this.activeContexts.get(ctx);
if (current === undefined) continue;
if (current <= 1) {
this.activeContexts.delete(ctx);
released.push(ctx);
} else {
this.activeContexts.set(ctx, current - 1);
}
}
if (this.isConnected()) {
this.send({ type: 'context.unsubscribe', contexts });
if (released.length > 0 && this.isConnected()) {
this.send({ type: 'context.unsubscribe', contexts: released });
}
}

Expand Down Expand Up @@ -315,7 +330,7 @@ export class EventServiceClient {
if (this.activeContexts.size > 0) {
this.send({
type: 'context.subscribe',
contexts: Array.from(this.activeContexts),
contexts: Array.from(this.activeContexts.keys()),
});
}
}
Expand Down