diff --git a/backend/database/conversations.py b/backend/database/conversations.py index 3a58052a58e..de28913a3ab 100644 --- a/backend/database/conversations.py +++ b/backend/database/conversations.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta, timezone from typing import List, Optional, Dict, Any -from google.api_core.exceptions import NotFound +from google.api_core.exceptions import Conflict, NotFound from google.cloud import firestore from google.cloud.firestore_v1 import FieldFilter @@ -171,6 +171,38 @@ def get_conversation(uid, conversation_id): return conversation_data +@set_data_protection_level(data_arg_name='conversation_data') +@prepare_for_write(data_arg_name='conversation_data', prepare_func=_prepare_conversation_for_write) +def _create_conversation_doc(uid: str, conversation_data: dict): + # Decorated for the same encryption / data-protection preparation as + # upsert_conversation. Uses Firestore create() which raises AlreadyExists if the + # document already exists; that exception propagates to create_conversation_if_absent. + if 'audio_base64_url' in conversation_data: + del conversation_data['audio_base64_url'] + if 'photos' in conversation_data: + del conversation_data['photos'] + + user_ref = db.collection('users').document(uid) + conversation_ref = user_ref.collection(conversations_collection).document(conversation_data['id']) + conversation_ref.create(conversation_data) + + +def create_conversation_if_absent(uid: str, conversation_data: dict) -> bool: + """Atomically create a conversation; return True if created, False if it already exists. + + Backed by Firestore document.create(), so concurrent writers can't overwrite each + other — true create-if-absent semantics (unlike upsert_conversation, which does a + last-writer-wins set()). + """ + try: + _create_conversation_doc(uid, conversation_data) + return True + except Conflict: + # Firestore create() raises Conflict (its AlreadyExists subclass on gRPC) when + # the document already exists. Catching the base Conflict covers both transports. + return False + + @prepare_for_read(decrypt_func=_prepare_conversation_for_read) @with_photos(get_conversation_photos) def get_conversations( diff --git a/backend/models/import_job.py b/backend/models/import_job.py index a5932f210cc..c3e2fc50eb5 100644 --- a/backend/models/import_job.py +++ b/backend/models/import_job.py @@ -24,6 +24,7 @@ class ImportJob(BaseModel): total_files: int = Field(default=0, description="Total number of files to process") processed_files: int = Field(default=0, description="Number of files processed so far") conversations_created: int = Field(default=0, description="Number of conversations created") + conversations_skipped: int = Field(default=0, description="Number of lifelogs skipped as already imported") created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) started_at: Optional[datetime] = Field(default=None, description="When processing started") completed_at: Optional[datetime] = Field(default=None, description="When processing completed") @@ -47,5 +48,6 @@ class ImportJobResponse(BaseModel): total_files: Optional[int] = None processed_files: Optional[int] = None conversations_created: Optional[int] = None + conversations_skipped: Optional[int] = None created_at: Optional[str] = None error: Optional[str] = None diff --git a/backend/routers/imports.py b/backend/routers/imports.py index 0a678f8b6f3..e616846a840 100644 --- a/backend/routers/imports.py +++ b/backend/routers/imports.py @@ -107,6 +107,7 @@ def get_import_jobs( total_files=job.get('total_files'), processed_files=job.get('processed_files'), conversations_created=job.get('conversations_created'), + conversations_skipped=job.get('conversations_skipped'), created_at=job.get('created_at'), error=job.get('error'), ) @@ -147,6 +148,7 @@ def get_import_job_status( total_files=job.get('total_files'), processed_files=job.get('processed_files'), conversations_created=job.get('conversations_created'), + conversations_skipped=job.get('conversations_skipped'), created_at=job.get('created_at'), error=job.get('error'), ) diff --git a/backend/test.sh b/backend/test.sh index 29ab17f03a6..3b5510d9419 100755 --- a/backend/test.sh +++ b/backend/test.sh @@ -48,6 +48,7 @@ pytest tests/unit/test_translation_optimization.py -v pytest tests/unit/test_translation_cost_optimization.py -v pytest tests/unit/test_conversation_source_unknown.py -v pytest tests/unit/test_conversation_model_split.py -v +pytest tests/unit/test_limitless_import_idempotency.py -v pytest tests/unit/test_transcribe_conversation_cache.py -v pytest tests/unit/test_pusher_private_cloud_data_protection.py -v pytest tests/unit/test_pusher_batch_upload.py -v diff --git a/backend/tests/unit/test_limitless_import_idempotency.py b/backend/tests/unit/test_limitless_import_idempotency.py new file mode 100644 index 00000000000..ad1110463d9 --- /dev/null +++ b/backend/tests/unit/test_limitless_import_idempotency.py @@ -0,0 +1,274 @@ +"""Tests for Limitless import idempotency (deterministic conversation IDs). + +Re-importing the same Limitless export must not create duplicate conversations, +and must not clobber edits a user made to a previously-imported conversation. +This is achieved by deriving each conversation's Firestore document ID +deterministically from (uid, lifelog start-time) and skipping lifelogs that are +already stored ("first import wins"). + +``database.*`` and ``utils.notifications`` are stubbed because they initialise +Firestore / FCM at import time. The Pydantic models are imported for real (no +external init), so this exercises the real ``process_limitless_import``. A small +in-memory store stands in for Firestore's atomic create-if-absent +(``document.create()``), so the tests assert real persistence behaviour (document +count, edit preservation), not just ID generation. True concurrency atomicity is a +property of Firestore ``create()`` itself and is not unit-tested here. +""" + +import hashlib +import io +import os +import sys +import types +import uuid as uuid_lib +from zipfile import ZipFile + +os.environ.setdefault( + "ENCRYPTION_SECRET", + "omi_ZwB2ZNqB2HHpMK6wStk7sTpavJiPTFg7gXUHnc4tFABPU6pZ2c2DKgehtfgi4RZv", +) + +from unittest.mock import MagicMock + + +def _stub_module(name): + mod = types.ModuleType(name) + sys.modules[name] = mod + return mod + + +# --- Pre-mock heavy deps (Firestore / FCM) before importing the module under test --- +if "database" not in sys.modules: + _database_mod = _stub_module("database") + _database_mod.__path__ = [] +else: + _database_mod = sys.modules["database"] + +for _sub in ["_client", "import_jobs", "conversations"]: + _full = f"database.{_sub}" + if _full not in sys.modules: + _m = _stub_module(_full) + setattr(_database_mod, _sub, _m) + + +def _faithful_document_id_from_seed(seed: str) -> str: + # Mirrors database._client.document_id_from_seed (kept in sync intentionally): + # SHA-256 of the seed, first 16 bytes reinterpreted as a UUID. Deterministic. + # NOTE: copied rather than imported because the real module initialises Firestore + # at import time. A follow-up could move the primitive to a Firestore-free module + # so tests import it directly and eliminate any drift risk. + seed_hash = hashlib.sha256(seed.encode("utf-8")).digest() + return str(uuid_lib.UUID(bytes=seed_hash[:16], version=4)) + + +class _FakeConversationStore: + """In-memory stand-in for Firestore's atomic create-if-absent (document.create()).""" + + def __init__(self): + self.docs = {} + self.fail_ids = set() # ids whose create() should raise (resilience testing) + + def reset(self): + self.docs = {} + self.fail_ids = set() + + def create_conversation_if_absent(self, uid, data): + cid = data["id"] + if cid in self.fail_ids: + raise RuntimeError("simulated firestore error") + if cid in self.docs: + return False # already exists -> skipped (mirrors AlreadyExists -> False) + self.docs[cid] = data + return True + + +_store = _FakeConversationStore() + +sys.modules["database._client"].db = MagicMock() +sys.modules["database._client"].document_id_from_seed = _faithful_document_id_from_seed +sys.modules["database.conversations"].create_conversation_if_absent = _store.create_conversation_if_absent +sys.modules["database.import_jobs"].create_import_job = MagicMock() +sys.modules["database.import_jobs"].update_import_job = MagicMock() + +# utils.notifications.send_notification pulls FCM/Firebase at import — stub just it. +# (utils / utils.imports stay real so the real limitless module is importable.) +sys.modules["utils.notifications"] = MagicMock() + +import utils.imports.limitless as limitless # noqa: E402 + +UID = "user-abc" +FN_A = "2025-10-08_07h00m25s_Morning-standup.md" +FN_B = "2025-10-09_09h15m00s_Design-review.md" + + +def _lifelog_md(first_line_text: str = "Hello team, let's begin.") -> str: + return ( + "# Morning Standup\n\n" + "## Summary\n\n" + "### Key point\n\n" + f"> [1](#startMs=1000&endMs=5000): {first_line_text}\n" + "> [2](#startMs=5000&endMs=9000): Sounds good to me.\n" + ) + + +def _zip_bytes(files: dict) -> bytes: + """files: {in_zip_path: markdown_content} -> ZIP bytes.""" + buf = io.BytesIO() + with ZipFile(buf, "w") as zf: + for path, content in files.items(): + zf.writestr(path, content) + return buf.getvalue() + + +def _run_import(tmp_path, zip_data: bytes, uid: str = UID, job_id: str = "job-1"): + """Write the zip to disk and run the (real) import worker.""" + zip_path = tmp_path / "export.zip" + zip_path.write_bytes(zip_data) + limitless.process_limitless_import(job_id, uid, str(zip_path)) + + +# --------------------------------------------------------------------------- # +# Helper-level behaviour +# --------------------------------------------------------------------------- # + + +def test_conversation_id_is_deterministic_and_timestamp_keyed(): + id1 = limitless.conversation_id_for_lifelog(UID, FN_A) + id2 = limitless.conversation_id_for_lifelog(UID, FN_A) + + assert id1 == id2, "same (uid, filename) must yield the same ID" + uuid_lib.UUID(id1) # must be a valid UUID string + + # Same start-time, different title slug -> same ID (survives Limitless re-titling). + retitled = "2025-10-08_07h00m25s_Completely-different-title.md" + assert limitless.conversation_id_for_lifelog(UID, retitled) == id1 + + # Different start-time -> different ID. + assert limitless.conversation_id_for_lifelog(UID, FN_B) != id1 + # Different user -> different ID. + assert limitless.conversation_id_for_lifelog("user-xyz", FN_A) != id1 + + +def test_unparseable_filename_falls_back_to_full_path(): + # Deterministic for the same path; distinct for different paths. + a = limitless.conversation_id_for_lifelog(UID, "no-timestamp.md") + b = limitless.conversation_id_for_lifelog(UID, "no-timestamp.md") + c = limitless.conversation_id_for_lifelog(UID, "other-no-timestamp.md") + assert a == b and a != c + # Same basename under different folders must NOT collide (full path is the fallback). + d = limitless.conversation_id_for_lifelog(UID, "a/lifelogs/note.md") + e = limitless.conversation_id_for_lifelog(UID, "b/lifelogs/note.md") + assert d != e + + +# --------------------------------------------------------------------------- # +# End-to-end idempotency through process_limitless_import +# --------------------------------------------------------------------------- # + + +def test_reimport_same_export_creates_no_duplicates(tmp_path): + _store.reset() + zip_data = _zip_bytes({f"lifelogs/{FN_A}": _lifelog_md(), f"lifelogs/{FN_B}": _lifelog_md("Design review.")}) + + _run_import(tmp_path, zip_data) + after_first = dict(_store.docs) + _run_import(tmp_path, zip_data) + + assert len(after_first) == 2, "both lifelogs imported on first run" + assert set(_store.docs) == set(after_first), "re-import must not add or change document IDs" + + +def test_reimport_preserves_user_edits(tmp_path): + """A re-import must skip an already-imported conversation, not overwrite edits.""" + _store.reset() + zip_data = _zip_bytes({f"lifelogs/{FN_A}": _lifelog_md()}) + + _run_import(tmp_path, zip_data) + (conv_id,) = list(_store.docs) + # Simulate the user editing the imported conversation inside Omi. + _store.docs[conv_id]["structured"]["title"] = "My edited title" + + _run_import(tmp_path, zip_data) # same export again + + assert len(_store.docs) == 1, "no duplicate created" + assert _store.docs[conv_id]["structured"]["title"] == "My edited title", "edit must survive re-import" + + +def test_distinct_lifelogs_get_distinct_ids(tmp_path): + _store.reset() + zip_data = _zip_bytes({f"lifelogs/{FN_A}": _lifelog_md(), f"lifelogs/{FN_B}": _lifelog_md("Design review.")}) + + _run_import(tmp_path, zip_data) + + assert len(_store.docs) == 2, "two different lifelogs must map to two different IDs" + + +def test_retitled_lifelog_is_deduped_across_imports(tmp_path): + """Same lifelog re-exported with a different title slug must not duplicate.""" + _store.reset() + original = "2025-10-08_07h00m25s_Morning-standup.md" + retitled = "2025-10-08_07h00m25s_Daily-sync.md" + + _run_import(tmp_path, _zip_bytes({f"lifelogs/{original}": _lifelog_md()})) + _run_import(tmp_path, _zip_bytes({f"lifelogs/{retitled}": _lifelog_md()})) + + assert len(_store.docs) == 1, "re-titled re-export of the same lifelog must dedupe" + + +def test_duplicate_basename_in_archive_does_not_overwrite(tmp_path): + """Two entries that resolve to the same ID within one archive: first wins, no clobber.""" + _store.reset() + zip_data = _zip_bytes( + { + f"a/lifelogs/{FN_A}": _lifelog_md("FIRST occurrence content."), + f"b/lifelogs/{FN_A}": _lifelog_md("SECOND occurrence content."), + } + ) + + _run_import(tmp_path, zip_data) + + assert len(_store.docs) == 1, "same-identity entries collapse to one conversation" + (conv_id,) = list(_store.docs) + assert _store.docs[conv_id]["transcript_segments"][0]["text"] == "FIRST occurrence content." + + +def test_nested_unparseable_basenames_do_not_collide(tmp_path): + """Distinct lifelogs with unparseable names sharing a basename under different + folders must both import (full path is the fallback identity, not the basename).""" + _store.reset() + zip_data = _zip_bytes( + { + "a/lifelogs/note.md": _lifelog_md("Folder A content."), + "b/lifelogs/note.md": _lifelog_md("Folder B content."), + } + ) + + _run_import(tmp_path, zip_data) + + assert len(_store.docs) == 2, "same basename in different folders must not be deduped when unparseable" + + +def test_persisted_id_is_deterministic_not_random(tmp_path): + _store.reset() + _run_import(tmp_path, _zip_bytes({f"lifelogs/{FN_A}": _lifelog_md()})) + + assert list(_store.docs) == [limitless.conversation_id_for_lifelog(UID, FN_A)] + + +def test_different_users_do_not_collide(tmp_path): + _store.reset() + _run_import(tmp_path, _zip_bytes({f"lifelogs/{FN_A}": _lifelog_md()}), uid="user-a", job_id="job-a") + _run_import(tmp_path, _zip_bytes({f"lifelogs/{FN_A}": _lifelog_md()}), uid="user-b", job_id="job-b") + + assert len(_store.docs) == 2, "same export imported by two users must not share conversation IDs" + + +def test_create_error_is_isolated_per_file(tmp_path): + """A create failure on one lifelog must not abort the rest of the import.""" + _store.reset() + _store.fail_ids.add(limitless.conversation_id_for_lifelog(UID, FN_A)) + + _run_import(tmp_path, _zip_bytes({f"lifelogs/{FN_A}": _lifelog_md(), f"lifelogs/{FN_B}": _lifelog_md("ok")})) + + # FN_A's create raised and was caught per-file; FN_B still imported. + assert list(_store.docs) == [limitless.conversation_id_for_lifelog(UID, FN_B)] diff --git a/backend/utils/imports/limitless.py b/backend/utils/imports/limitless.py index f1499495a96..1341dbe91b5 100644 --- a/backend/utils/imports/limitless.py +++ b/backend/utils/imports/limitless.py @@ -16,6 +16,7 @@ import database.import_jobs as import_jobs_db import database.conversations as conversations_db +from database._client import document_id_from_seed from models.conversation import AppResult, Conversation from models.conversation_enums import CategoryEnum, ConversationSource, ConversationStatus from models.structured import Structured @@ -198,6 +199,35 @@ def _create_overview_from_transcript(segments: List[TranscriptSegment], max_char return overview if overview else "Imported from Limitless" +# Namespace prefix for deterministic Limitless conversation IDs. NEVER CHANGE this +# string: it is baked into the ID of every already-imported conversation, so a new +# value would orphan them and re-create duplicates on the next import. +LIMITLESS_IMPORT_ID_NAMESPACE = "limitless" + + +def conversation_id_for_lifelog(uid: str, lifelog_path: str) -> str: + """Deterministic conversation ID for a Limitless lifelog file. + + Keyed on (uid, stable lifelog identity) via the shared ``document_id_from_seed`` + primitive, so re-importing the same export resolves to the same ID and the + importer can skip lifelogs it has already stored (idempotent import). + + The identity is the lifelog's start timestamp parsed from the filename + (e.g. ``2025-10-08_07h00m25s_Title-slug.md`` -> ``2025-10-08T07:00:25+00:00``). + The mutable title slug is deliberately excluded so that if Limitless + regenerates a lifelog's title between exports, the re-import still maps to the + same conversation instead of creating a near-duplicate. A single pendant cannot + start two lifelogs in the same second, so the timestamp alone identifies a + lifelog. If the filename carries no parseable timestamp, the lifelog's full path + within the export is used as the fallback identity — not just its basename — so + distinct files that share a basename in different folders don't collide (and get + one of them silently skipped). + """ + started_at, _title_slug = parse_lifelog_filename(Path(lifelog_path).name) + identity = started_at.isoformat() if started_at else lifelog_path + return document_id_from_seed(f"{LIMITLESS_IMPORT_ID_NAMESPACE}:{uid}:{identity}") + + def process_limitless_import(job_id: str, uid: str, zip_path: str, language_code: str = 'en') -> None: """ Background worker to process a Limitless ZIP export using LIGHT IMPORT mode. @@ -266,6 +296,7 @@ def process_limitless_import(job_id: str, uid: str, zip_path: str, language_code processed_files = 0 conversations_created = 0 + conversations_skipped = 0 errors = [] for lifelog_path in lifelog_files: @@ -282,6 +313,9 @@ def process_limitless_import(job_id: str, uid: str, zip_path: str, language_code import_jobs_db.update_import_job(job_id, {'processed_files': processed_files}) continue + # Deterministic, idempotent conversation ID (keyed on lifelog identity). + conversation_id = conversation_id_for_lifelog(uid, lifelog_path) + # Calculate finished_at from last segment if segments and started_at: last_segment_end = max(seg.end for seg in segments) @@ -311,9 +345,9 @@ def process_limitless_import(job_id: str, uid: str, zip_path: str, language_code events=[], ) - # Create conversation object directly + # Create conversation object directly (no AI processing). conversation = Conversation( - id=str(uuid.uuid4()), + id=conversation_id, created_at=started_at, # Use started_at as created_at for proper ordering started_at=started_at, finished_at=finished_at, @@ -326,9 +360,16 @@ def process_limitless_import(job_id: str, uid: str, zip_path: str, language_code discarded=False, ) - # Save directly to database (skip all AI processing) - conversations_db.upsert_conversation(uid, conversation.dict()) - conversations_created += 1 + # Create-if-absent so re-importing the same export skips lifelogs already + # stored instead of overwriting them. This is atomic (Firestore create()), + # so it never duplicates and never clobbers edits a user may have made to a + # previously-imported conversation ("first import wins"). Picking up + # Limitless's own later edits to a lifelog is intentionally out of scope. + if conversations_db.create_conversation_if_absent(uid, conversation.dict()): + conversations_created += 1 + else: + conversations_skipped += 1 + logger.info(f"[Limitless Import] Skipped already-imported lifelog: {filename}") except Exception as e: error_msg = f"Error processing {lifelog_path}: {str(e)}" @@ -344,15 +385,23 @@ def process_limitless_import(job_id: str, uid: str, zip_path: str, language_code { 'processed_files': processed_files, 'conversations_created': conversations_created, + 'conversations_skipped': conversations_skipped, }, ) + logger.info( + f"[Limitless Import] Done: {conversations_created} created, " + f"{conversations_skipped} skipped (already imported), {len(errors)} errors" + ) + # Mark as completed final_status = ImportJobStatus.completed.value error_msg = None if errors: - if conversations_created == 0: + # Only a hard failure if nothing was created and nothing was skipped + # (a re-import that skips everything is a success, not a failure). + if conversations_created == 0 and conversations_skipped == 0: final_status = ImportJobStatus.failed.value error_msg = f"All files failed to process. First error: {errors[0]}" else: @@ -365,19 +414,32 @@ def process_limitless_import(job_id: str, uid: str, zip_path: str, language_code 'status': final_status, 'completed_at': datetime.now(timezone.utc).isoformat(), 'error': error_msg, + # Authoritative final counts (the periodic update can lag if the last + # files were skipped/empty and short-circuited before it ran). + 'conversations_created': conversations_created, + 'conversations_skipped': conversations_skipped, }, ) # Send push notification if final_status == ImportJobStatus.completed.value: + complete_body = f"Successfully imported {conversations_created} conversations from your Limitless data." + if conversations_skipped: + complete_body = ( + f"Imported {conversations_created} new conversations " + f"({conversations_skipped} already imported) from your Limitless data." + ) + if errors: + complete_body += f" {len(errors)} file(s) could not be processed." send_notification( user_id=uid, title="Limitless Import Complete! 🎉", - body=f"Successfully imported {conversations_created} conversations from your Limitless data.", + body=complete_body, data={ 'type': 'import_complete', 'job_id': job_id, 'conversations_created': str(conversations_created), + 'conversations_skipped': str(conversations_skipped), }, ) else: