diff --git a/web-admin/src/features/dashboards/share/ShareDashboardPopover.svelte b/web-admin/src/features/dashboards/share/ShareDashboardPopover.svelte index 415f1ac578d..ee24013295d 100644 --- a/web-admin/src/features/dashboards/share/ShareDashboardPopover.svelte +++ b/web-admin/src/features/dashboards/share/ShareDashboardPopover.svelte @@ -14,6 +14,8 @@ TabsList, TabsTrigger, } from "@rilldata/web-common/components/tabs"; + import Tooltip from "@rilldata/web-common/components/tooltip/Tooltip.svelte"; + import TooltipContent from "@rilldata/web-common/components/tooltip/TooltipContent.svelte"; import { featureFlags } from "@rilldata/web-common/features/feature-flags"; export let createMagicAuthTokens: boolean; @@ -39,9 +41,12 @@ }} > - + + + Share dashboard + diff --git a/web-admin/src/features/projects/user-management/ShareProjectPopover.svelte b/web-admin/src/features/projects/user-management/ShareProjectPopover.svelte index 03094881b88..06987de106e 100644 --- a/web-admin/src/features/projects/user-management/ShareProjectPopover.svelte +++ b/web-admin/src/features/projects/user-management/ShareProjectPopover.svelte @@ -8,6 +8,8 @@ PopoverContent, PopoverTrigger, } from "@rilldata/web-common/components/popover"; + import Tooltip from "@rilldata/web-common/components/tooltip/Tooltip.svelte"; + import TooltipContent from "@rilldata/web-common/components/tooltip/TooltipContent.svelte"; import { copyWithAdditionalArguments } from "@rilldata/web-common/lib/url-utils.ts"; import { onMount } from "svelte"; @@ -33,7 +35,12 @@ - + + + Share project + this.enforceMaxConcurrentStreams(), ); + conversation.on("conversation-forked", (newConversationId) => + this.handleConversationForked($conversationId, newConversationId), + ); this.conversations.set($conversationId, conversation); return conversation; }, @@ -223,6 +226,26 @@ export class ConversationManager { void invalidateConversationsList(this.instanceId); } + /** + * Handle conversation forking - updates state to navigate to the forked conversation + * Called when a non-owner sends a message and the conversation is forked + */ + private handleConversationForked( + originalConversationId: string, + newConversationId: string, + ): void { + // Get the forked conversation (which is the updated instance) + const forkedConversation = this.conversations.get(originalConversationId); + if (forkedConversation) { + // Move the conversation to the new ID in our map + this.conversations.delete(originalConversationId); + this.conversations.set(newConversationId, forkedConversation); + } + + // Navigate to the new forked conversation + this.conversationSelector.selectConversation(newConversationId); + } + /** * Rotates the new conversation: moves current "new" conversation to the conversations map * and creates a fresh "new" conversation instance diff --git a/web-common/src/features/chat/core/conversation.spec.ts b/web-common/src/features/chat/core/conversation.spec.ts new file mode 100644 index 00000000000..3841e844e5b --- /dev/null +++ b/web-common/src/features/chat/core/conversation.spec.ts @@ -0,0 +1,249 @@ +import { queryClient } from "@rilldata/web-common/lib/svelte-query/globalQueryClient"; +import { + getRuntimeServiceGetConversationQueryKey, + type V1GetConversationResponse, + type V1Message, +} from "@rilldata/web-common/runtime-client"; +import { get } from "svelte/store"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { Conversation } from "./conversation"; +import { NEW_CONVERSATION_ID } from "./utils"; + +// ============================================================================= +// MOCKS +// ============================================================================= + +vi.mock("@rilldata/web-common/runtime-client", async (importOriginal) => { + const original = + await importOriginal< + typeof import("@rilldata/web-common/runtime-client") + >(); + return { + ...original, + runtimeServiceForkConversation: vi.fn(), + }; +}); + +vi.mock("@rilldata/web-common/runtime-client/runtime-store", () => ({ + runtime: { + subscribe: (fn: (value: { host: string }) => void) => { + fn({ host: "http://localhost:9009" }); + return () => {}; + }, + }, +})); + +import { runtimeServiceForkConversation } from "@rilldata/web-common/runtime-client"; + +// ============================================================================= +// TEST CONSTANTS +// ============================================================================= + +const INSTANCE_ID = "test-instance"; +const ORIGINAL_CONVERSATION_ID = "original-conv-123"; +const FORKED_CONVERSATION_ID = "forked-conv-456"; + +// ============================================================================= +// HELPERS +// ============================================================================= + +function getCacheKey(conversationId: string) { + return getRuntimeServiceGetConversationQueryKey(INSTANCE_ID, conversationId); +} + +function getCachedData(conversationId: string) { + return queryClient.getQueryData( + getCacheKey(conversationId), + ); +} + +function seedCache( + conversationId: string, + options: { + isOwner: boolean; + messages?: Partial[]; + title?: string; + createdOn?: string; + }, +) { + queryClient.setQueryData( + getCacheKey(conversationId), + { + conversation: { + id: conversationId, + title: options.title, + createdOn: options.createdOn, + }, + messages: options.messages as V1Message[], + isOwner: options.isOwner, + }, + ); +} + +function mockForkSuccess(forkedId: string = FORKED_CONVERSATION_ID) { + vi.mocked(runtimeServiceForkConversation).mockResolvedValue({ + conversationId: forkedId, + }); +} + +function mockForkFailure(error: Error = new Error("Fork failed")) { + vi.mocked(runtimeServiceForkConversation).mockRejectedValue(error); +} + +function mockForkEmptyResponse() { + vi.mocked(runtimeServiceForkConversation).mockResolvedValue({}); +} + +function createConversation(conversationId: string = ORIGINAL_CONVERSATION_ID) { + return new Conversation(INSTANCE_ID, conversationId); +} + +async function sendMessageAndIgnoreStreamError( + conversation: Conversation, + message: string, +) { + conversation.draftMessage.set(message); + await conversation.sendMessage({}).catch(() => {}); +} + +// ============================================================================= +// TESTS +// ============================================================================= + +describe("Conversation", () => { + beforeEach(() => { + queryClient.clear(); + vi.clearAllMocks(); + }); + + afterEach(() => { + queryClient.clear(); + }); + + describe("forkConversation", () => { + it("forks and copies messages when non-owner sends a message", async () => { + // Arrange + seedCache(ORIGINAL_CONVERSATION_ID, { + isOwner: false, + title: "Shared conversation", + createdOn: "2024-01-01T00:00:00Z", + messages: [ + { id: "msg-1", role: "user", contentData: "Hello" }, + { id: "msg-2", role: "assistant", contentData: "Hi there!" }, + ], + }); + mockForkSuccess(); + + const conversation = createConversation(); + let forkedId: string | null = null; + conversation.on("conversation-forked", (id) => (forkedId = id)); + + // Act + conversation.draftMessage.set("My follow-up question"); + const sendPromise = conversation.sendMessage({}); + await vi.waitFor(() => expect(forkedId).toBe(FORKED_CONVERSATION_ID)); + + // Assert: fork API called correctly + expect(runtimeServiceForkConversation).toHaveBeenCalledWith( + INSTANCE_ID, + ORIGINAL_CONVERSATION_ID, + {}, + ); + + // Assert: cache updated with forked conversation + const forkedData = getCachedData(FORKED_CONVERSATION_ID); + expect(forkedData?.conversation?.id).toBe(FORKED_CONVERSATION_ID); + expect(forkedData?.conversation?.title).toBe("Shared conversation"); + expect(forkedData?.conversation?.createdOn).toBe("2024-01-01T00:00:00Z"); + expect(forkedData?.isOwner).toBe(true); + + // Assert: messages copied + optimistic message added + expect(forkedData?.messages).toHaveLength(3); + expect(forkedData?.messages?.[0]?.contentData).toBe("Hello"); + expect(forkedData?.messages?.[1]?.contentData).toBe("Hi there!"); + expect(forkedData?.messages?.[2]?.role).toBe("user"); + + // Cleanup + conversation.cleanup(); + await sendPromise.catch(() => {}); + }); + + it("does NOT fork when owner sends a message", async () => { + // Arrange + seedCache(ORIGINAL_CONVERSATION_ID, { isOwner: true, messages: [] }); + const conversation = createConversation(); + + // Act + await sendMessageAndIgnoreStreamError(conversation, "My message"); + + // Assert + expect(runtimeServiceForkConversation).not.toHaveBeenCalled(); + + conversation.cleanup(); + }); + + it("does NOT fork for new conversations", async () => { + // Arrange + const conversation = createConversation(NEW_CONVERSATION_ID); + + // Act + await sendMessageAndIgnoreStreamError(conversation, "First message"); + + // Assert + expect(runtimeServiceForkConversation).not.toHaveBeenCalled(); + + conversation.cleanup(); + }); + + it("sets error and stops streaming if fork API fails", async () => { + // Arrange + seedCache(ORIGINAL_CONVERSATION_ID, { isOwner: false, messages: [] }); + mockForkFailure(); + const conversation = createConversation(); + + // Act + conversation.draftMessage.set("My message"); + await conversation.sendMessage({}); + + // Assert + expect(get(conversation.streamError)).toContain( + "Failed to create your copy", + ); + expect(get(conversation.isStreaming)).toBe(false); + + conversation.cleanup(); + }); + + it("sets error if fork response is missing conversation ID", async () => { + // Arrange + seedCache(ORIGINAL_CONVERSATION_ID, { isOwner: false, messages: [] }); + mockForkEmptyResponse(); + const conversation = createConversation(); + + // Act + conversation.draftMessage.set("My message"); + await conversation.sendMessage({}); + + // Assert + expect(get(conversation.streamError)).toContain( + "Failed to create your copy", + ); + expect(get(conversation.isStreaming)).toBe(false); + + conversation.cleanup(); + }); + + it("does NOT fork when cache is empty (optimistic ownership)", async () => { + // Arrange: no cache data seeded - ownership defaults to true + const conversation = createConversation(); + + // Act + await sendMessageAndIgnoreStreamError(conversation, "Test"); + + // Assert + expect(runtimeServiceForkConversation).not.toHaveBeenCalled(); + + conversation.cleanup(); + }); + }); +}); diff --git a/web-common/src/features/chat/core/conversation.ts b/web-common/src/features/chat/core/conversation.ts index da0d1d64f16..bf1ebe8221d 100644 --- a/web-common/src/features/chat/core/conversation.ts +++ b/web-common/src/features/chat/core/conversation.ts @@ -2,6 +2,7 @@ import { queryClient } from "@rilldata/web-common/lib/svelte-query/globalQueryCl import { getRuntimeServiceGetConversationQueryKey, getRuntimeServiceGetConversationQueryOptions, + runtimeServiceForkConversation, type RpcStatus, type RuntimeServiceCompleteBody, type V1CompleteStreamingResponse, @@ -15,7 +16,13 @@ import { type SSEMessage, } from "@rilldata/web-common/runtime-client/sse-fetch-client"; import { createQuery, type CreateQueryResult } from "@tanstack/svelte-query"; -import { derived, get, writable, type Readable } from "svelte/store"; +import { + derived, + get, + writable, + type Readable, + type Writable, +} from "svelte/store"; import type { HTTPError } from "../../../runtime-client/fetchWrapper"; import { transformToBlocks, type Block } from "./messages/block-transform"; import { MessageContentType, MessageType, ToolName } from "./types"; @@ -28,6 +35,7 @@ import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts"; type ConversationEvents = { "conversation-created": string; + "conversation-forked": string; "stream-start": void; message: V1Message; "stream-complete": string; @@ -58,33 +66,70 @@ export class Conversation { private sseClient: SSEFetchClient | null = null; private hasReceivedFirstMessage = false; + // Reactive conversation ID - enables query to auto-update when ID changes (e.g., after fork) + private readonly conversationIdStore: Writable; + private readonly conversationQuery: CreateQueryResult< + V1GetConversationResponse, + RpcStatus + >; + + public get conversationId(): string { + return get(this.conversationIdStore); + } + + private set conversationId(value: string) { + this.conversationIdStore.set(value); + } + constructor( private readonly instanceId: string, - public conversationId: string, - private readonly agent: string = ToolName.ANALYST_AGENT, // Hardcoded default for now - ) {} + initialConversationId: string, + private readonly agent: string = ToolName.ANALYST_AGENT, + ) { + this.conversationIdStore = writable(initialConversationId); + + // Create query with reactive options that respond to conversationId changes + const queryOptionsStore = derived( + this.conversationIdStore, + ($conversationId) => + getRuntimeServiceGetConversationQueryOptions( + this.instanceId, + $conversationId, + { + query: { + enabled: $conversationId !== NEW_CONVERSATION_ID, + staleTime: Infinity, // We manage cache manually during streaming + }, + }, + ), + ); + this.conversationQuery = createQuery(queryOptionsStore, queryClient); + } + + /** + * Get ownership status from the conversation query. + * Returns true if the current user owns this conversation or if ownership is unknown. + */ + private getIsOwner(): boolean { + if (this.conversationId === NEW_CONVERSATION_ID) { + return true; // New conversations are always owned by the creator + } + + // Default to true if query hasn't loaded yet (optimistic assumption) + return get(this.conversationQuery).data?.isOwner ?? true; + } // ===== PUBLIC API ===== /** - * Get a reactive query for this conversation's data + * Get a reactive query for this conversation's data. + * The query reacts to conversationId changes (e.g., after fork). */ public getConversationQuery(): CreateQueryResult< V1GetConversationResponse, RpcStatus > { - return createQuery( - getRuntimeServiceGetConversationQueryOptions( - this.instanceId, - this.conversationId, - { - query: { - enabled: this.conversationId !== NEW_CONVERSATION_ID, - }, - }, - ), - queryClient, - ); + return this.conversationQuery; } /** @@ -140,6 +185,24 @@ export class Conversation { this.isStreaming.set(true); this.hasReceivedFirstMessage = false; + // Fork conversation if user is not the owner (viewing a shared conversation) + const isOwner = this.getIsOwner(); + if (!isOwner && this.conversationId !== NEW_CONVERSATION_ID) { + try { + const forkedConversationId = await this.forkConversation(); + // Update to the forked conversation (setter updates the reactive store) + this.conversationId = forkedConversationId; + this.events.emit("conversation-forked", forkedConversationId); + } catch (error) { + console.error("[Conversation] Fork failed:", error); + this.isStreaming.set(false); + this.streamError.set( + "Failed to create your copy of this conversation. Please try again.", + ); + return; + } + } + const userMessage = this.addOptimisticUserMessage(prompt); try { @@ -323,6 +386,61 @@ export class Conversation { // ----- Conversation Lifecycle ----- + /** + * Fork the current conversation to create a copy owned by the current user. + * Used when a non-owner wants to continue a shared conversation. + * + * Note: The cache copying logic here follows the pattern established by + * `transitionToRealConversation`—both read from an old cache key and write + * to a new one with an updated conversation ID. However, since forking + * conceptually creates a new conversation from an existing one, this + * responsibility might be better suited for ConversationManager in the future. + */ + private async forkConversation(): Promise { + const originalConversationId = this.conversationId; + + const response = await runtimeServiceForkConversation( + this.instanceId, + this.conversationId, + {}, + ); + + if (!response.conversationId) { + throw new Error("Fork response missing conversation ID"); + } + + const forkedConversationId = response.conversationId; + + // Copy cached messages from original conversation to forked conversation + // This ensures the UI shows the conversation history immediately + const originalCacheKey = getRuntimeServiceGetConversationQueryKey( + this.instanceId, + originalConversationId, + ); + const forkedCacheKey = getRuntimeServiceGetConversationQueryKey( + this.instanceId, + forkedConversationId, + ); + const originalData = + queryClient.getQueryData(originalCacheKey); + + if (originalData) { + queryClient.setQueryData(forkedCacheKey, { + conversation: { + ...originalData.conversation, + id: forkedConversationId, + }, + messages: originalData.messages || [], + isOwner: true, // User now owns the forked conversation + }); + } + + // Invalidate the conversations list to show the new forked conversation + void invalidateConversationsList(this.instanceId); + + return forkedConversationId; + } + /** * Transition from NEW_CONVERSATION_ID to real conversation ID * Transfers all cached data to the new conversation cache @@ -356,7 +474,7 @@ export class Conversation { // Clean up the old "new" conversation cache queryClient.removeQueries({ queryKey: oldCacheKey }); - // Update the conversation ID + // Update the conversation ID (setter updates the reactive store) this.conversationId = realConversationId; // Notify that conversation was created diff --git a/web-common/src/features/chat/layouts/fullpage/FullPageChat.svelte b/web-common/src/features/chat/layouts/fullpage/FullPageChat.svelte index d2795a8d738..c79b9f5e083 100644 --- a/web-common/src/features/chat/layouts/fullpage/FullPageChat.svelte +++ b/web-common/src/features/chat/layouts/fullpage/FullPageChat.svelte @@ -1,5 +1,6 @@ - - + + Conversation history + + import { page } from "$app/stores"; import IconButton from "../../../../components/button/IconButton.svelte"; import Close from "../../../../components/icons/Close.svelte"; import PlusIcon from "../../../../components/icons/PlusIcon.svelte"; import { type V1Conversation } from "../../../../runtime-client"; + import { runtime } from "../../../../runtime-client/runtime-store"; import type { ConversationManager } from "../../core/conversation-manager"; + import ShareChatPopover from "../../share/ShareChatPopover.svelte"; import ConversationHistoryMenu from "./ConversationHistoryMenu.svelte"; export let conversationManager: ConversationManager; export let onNewConversation: () => void; export let onClose: () => void; + $: ({ instanceId } = $runtime); + $: organization = $page.params.organization; + $: project = $page.params.project; + $: currentConversationStore = conversationManager.getCurrentConversation(); $: getConversationQuery = $currentConversationStore?.getConversationQuery(); $: currentConversationDto = $getConversationQuery?.data?.conversation ?? null; @@ -36,8 +43,17 @@ on:click={handleNewConversation} > + New conversation + + + Close diff --git a/web-common/src/features/chat/share/ShareChatPopover.svelte b/web-common/src/features/chat/share/ShareChatPopover.svelte new file mode 100644 index 00000000000..3e74a729911 --- /dev/null +++ b/web-common/src/features/chat/share/ShareChatPopover.svelte @@ -0,0 +1,103 @@ + + + + + + + + {disabled ? disabledTooltip : "Share conversation"} + + + + +
+

Share conversation

+

+ Share this conversation with other project members. They can view and + continue the conversation. +

+ {#if shareError} +

{shareError}

+ {/if} + +
+
+