diff --git a/code_review_graph/communities.py b/code_review_graph/communities.py index 02a556f..5f31d34 100644 --- a/code_review_graph/communities.py +++ b/code_review_graph/communities.py @@ -551,6 +551,9 @@ def store_communities( # that are tightly coupled to the DB transaction lifecycle. conn = store._conn + if conn.in_transaction: + logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE") + conn.rollback() # Wrap in explicit transaction so the DELETE + INSERT + UPDATE # sequence is atomic — no partial community data on crash. conn.execute("BEGIN IMMEDIATE") @@ -561,7 +564,8 @@ def store_communities( count = 0 for comm in communities: cursor = conn.execute( - """INSERT INTO communities (name, level, cohesion, size, dominant_language, description) + """INSERT INTO communities + (name, level, cohesion, size, dominant_language, description) VALUES (?, ?, ?, ?, ?, ?)""", ( comm["name"], diff --git a/code_review_graph/flows.py b/code_review_graph/flows.py index 1cfb496..9f8a730 100644 --- a/code_review_graph/flows.py +++ b/code_review_graph/flows.py @@ -314,6 +314,9 @@ def store_flows(store: GraphStore, flows: list[dict]) -> int: # tightly coupled to the DB transaction lifecycle. conn = store._conn + if conn.in_transaction: + logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE") + conn.rollback() # Wrap the full DELETE + INSERT sequence in an explicit transaction # so partial writes cannot occur if an exception interrupts the loop. conn.execute("BEGIN IMMEDIATE") diff --git a/code_review_graph/graph.py b/code_review_graph/graph.py index bccb1e0..50d4353 100644 --- a/code_review_graph/graph.py +++ b/code_review_graph/graph.py @@ -237,16 +237,9 @@ def store_file_nodes_edges( self, file_path: str, nodes: list[NodeInfo], edges: list[EdgeInfo], fhash: str = "" ) -> None: """Atomically replace all data for a file.""" - # Defense-in-depth: flush any pending transaction before BEGIN - # IMMEDIATE. The root cause (implicit transactions from legacy - # isolation_level="") is fixed by setting isolation_level=None in - # __init__, but external code accessing _conn directly (e.g. - # _compute_summaries, flows.py, communities.py) could still leave - # a transaction open. - # See: https://github.com/tirth8205/code-review-graph/issues/135 if self._conn.in_transaction: - logger.warning("Flushing unexpected open transaction before BEGIN IMMEDIATE") - self._conn.commit() + logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE") + self._conn.rollback() self._conn.execute("BEGIN IMMEDIATE") try: self.remove_file_data(file_path) @@ -274,6 +267,7 @@ def commit(self) -> None: self._conn.commit() def rollback(self) -> None: + """Rollback the current transaction.""" self._conn.rollback() # --- Read operations --- diff --git a/code_review_graph/search.py b/code_review_graph/search.py index d2eb84e..70042f5 100644 --- a/code_review_graph/search.py +++ b/code_review_graph/search.py @@ -35,23 +35,28 @@ def rebuild_fts_index(store: GraphStore) -> int: # the FTS5 virtual table DDL, which is tightly coupled to SQLite internals. conn = store._conn - # Drop and recreate the FTS table to avoid content-sync mismatch issues - conn.execute("DROP TABLE IF EXISTS nodes_fts") - conn.execute(""" - CREATE VIRTUAL TABLE nodes_fts USING fts5( - name, qualified_name, file_path, signature, - tokenize='porter unicode61' - ) - """) - conn.commit() - - # Populate from nodes table - conn.execute(""" - INSERT INTO nodes_fts(rowid, name, qualified_name, file_path, signature) - SELECT id, name, qualified_name, file_path, COALESCE(signature, '') - FROM nodes - """) - conn.commit() + if conn.in_transaction: + logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE") + conn.rollback() + conn.execute("BEGIN IMMEDIATE") + try: + # Drop and recreate the FTS table with content sync to match migration v5 + conn.execute("DROP TABLE IF EXISTS nodes_fts") + conn.execute(""" + CREATE VIRTUAL TABLE nodes_fts USING fts5( + name, qualified_name, file_path, signature, + content='nodes', content_rowid='rowid', + tokenize='porter unicode61' + ) + """) + + # Rebuild from the content table (nodes) using the FTS5 rebuild command + conn.execute("INSERT INTO nodes_fts(nodes_fts) VALUES('rebuild')") + + conn.commit() + except BaseException: + conn.rollback() + raise count = conn.execute("SELECT count(*) FROM nodes_fts").fetchone()[0] logger.info("FTS index rebuilt: %d rows indexed", count) diff --git a/code_review_graph/tools/build.py b/code_review_graph/tools/build.py index 7ddf3fe..46fd02c 100644 --- a/code_review_graph/tools/build.py +++ b/code_review_graph/tools/build.py @@ -407,6 +407,7 @@ def run_postprocess( fts_count = rebuild_fts_index(store) result["fts_indexed"] = fts_count except (sqlite3.OperationalError, ImportError) as e: + store.rollback() logger.warning("FTS index rebuild failed: %s", e) warnings.append(f"FTS index rebuild failed: {type(e).__name__}: {e}") @@ -419,6 +420,7 @@ def run_postprocess( count = _store_flows(store, traced) result["flows_detected"] = count except (sqlite3.OperationalError, ImportError) as e: + store.rollback() logger.warning("Flow detection failed: %s", e) warnings.append(f"Flow detection failed: {type(e).__name__}: {e}") @@ -435,6 +437,7 @@ def run_postprocess( count = _store_communities(store, comms) result["communities_detected"] = count except (sqlite3.OperationalError, ImportError) as e: + store.rollback() logger.warning("Community detection failed: %s", e) warnings.append(f"Community detection failed: {type(e).__name__}: {e}") diff --git a/tests/test_fts_sync.py b/tests/test_fts_sync.py new file mode 100644 index 0000000..41aecf8 --- /dev/null +++ b/tests/test_fts_sync.py @@ -0,0 +1,75 @@ +"""Tests for FTS5 content sync robustness.""" + +import sqlite3 +import tempfile +from pathlib import Path + +import pytest + +from code_review_graph.graph import GraphStore +from code_review_graph.parser import NodeInfo +from code_review_graph.search import rebuild_fts_index + +@pytest.fixture +def store(): + """Create a temporary GraphStore for testing.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: + db_path = tmp.name + store = GraphStore(db_path) + yield store + store.close() + Path(db_path).unlink(missing_ok=True) + +class TestFTSSync: + def test_fts_rebuild_syncs_with_nodes(self, store): + """Test that rebuild_fts_index properly populates from nodes table.""" + # 1. Add some nodes + node1 = NodeInfo( + kind="Function", name="calculate_total", file_path="app.py", + line_start=1, line_end=5, language="python" + ) + node2 = NodeInfo( + kind="Class", name="OrderProcessor", file_path="app.py", + line_start=10, line_end=50, language="python" + ) + store.store_file_nodes_edges("app.py", [node1, node2], []) + + # 2. Rebuild FTS + count = rebuild_fts_index(store) + assert count == 2 + + # 3. Verify FTS content via search + # We query the virtual table directly to ensure it has the data + fts_rows = store._conn.execute( + "SELECT name FROM nodes_fts WHERE name MATCH 'calculate*'" + ).fetchall() + assert len(fts_rows) == 1 + assert fts_rows[0]["name"] == "calculate_total" + + def test_fts_rebuild_clears_old_data(self, store): + """Test that rebuild_fts_index clears existing FTS data before repopulating.""" + # 1. Add and index one node + node1 = NodeInfo( + kind="Function", name="old_func", file_path="old.py", + line_start=1, line_end=5, language="python" + ) + store.store_file_nodes_edges("old.py", [node1], []) + rebuild_fts_index(store) + + # 2. Delete the file/nodes + store.remove_file_data("old.py") + store.commit() + + # 3. Add a new node + node2 = NodeInfo( + kind="Function", name="new_func", file_path="new.py", + line_start=1, line_end=5, language="python" + ) + store.store_file_nodes_edges("new.py", [node2], []) + + # 4. Rebuild FTS - should ONLY have new_func + rebuild_fts_index(store) + + fts_rows = store._conn.execute("SELECT name FROM nodes_fts").fetchall() + assert len(fts_rows) == 1 + assert fts_rows[0]["name"] == "new_func" diff --git a/tests/test_transactions.py b/tests/test_transactions.py new file mode 100644 index 0000000..a042adc --- /dev/null +++ b/tests/test_transactions.py @@ -0,0 +1,116 @@ +"""Tests for SQLite transaction robustness and nesting scenarios.""" + +import sqlite3 +import tempfile +import logging +from pathlib import Path +from unittest.mock import patch + +import pytest + +from code_review_graph.graph import GraphStore +from code_review_graph.parser import NodeInfo, EdgeInfo +from code_review_graph.communities import store_communities +from code_review_graph.flows import store_flows + +@pytest.fixture +def store(): + """Create a temporary GraphStore for testing.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: + db_path = tmp.name + store = GraphStore(db_path) + yield store + store.close() + Path(db_path).unlink(missing_ok=True) + +class TestTransactionRobustness: + def test_nested_transaction_guard_in_store_file(self, store, caplog): + """Test that store_file_nodes_edges handles an already open transaction.""" + # Manually open a transaction + store._conn.execute("BEGIN") + store._conn.execute("INSERT INTO metadata (key, value) VALUES (?, ?)", ("test", "val")) + assert store._conn.in_transaction + + # This should trigger the guard, rollback the uncommitted insert, and start a new transaction + with caplog.at_level(logging.WARNING): + store.store_file_nodes_edges("test.py", [], []) + + assert "Rolling back uncommitted transaction before BEGIN IMMEDIATE" in caplog.text + assert not store._conn.in_transaction + + # Verify the "val" was rolled back + assert store.get_metadata("test") is None + + def test_atomic_community_storage(self, store): + """Test that store_communities is atomic and handles existing transactions.""" + communities = [ + {"name": "comm1", "size": 1, "members": ["node1"]} + ] + + # Leave a transaction open + store._conn.execute("BEGIN") + store._conn.execute("INSERT INTO metadata (key, value) VALUES ('leak', 'stale')") + + # Should rollback the 'leak' and successfully store communities + store_communities(store, communities) + + assert store.get_metadata("leak") is None + + # Verify communities table + count = store._conn.execute("SELECT count(*) FROM communities").fetchone()[0] + assert count == 1 + + def test_atomic_flow_storage(self, store): + """Test that store_flows is atomic and handles existing transactions.""" + flows = [ + { + "name": "flow1", "entry_point_id": 1, "depth": 1, + "node_count": 1, "file_count": 1, "criticality": 0.5, + "path": [1] + } + ] + + # Leave a transaction open + store._conn.execute("BEGIN") + store._conn.execute("INSERT INTO metadata (key, value) VALUES ('leak', 'stale')") + + # Should rollback and store flows + store_flows(store, flows) + + assert store.get_metadata("leak") is None + count = store._conn.execute("SELECT count(*) FROM flows").fetchone()[0] + assert count == 1 + + def test_rollback_on_failure_in_batch_ops(self, store): + """Verify that store_file_nodes_edges rolls back if an operation fails inside.""" + # Pre-seed some data + node_keep = NodeInfo( + kind="File", name="keep", file_path="keep.py", + line_start=1, line_end=10, language="python" + ) + store.store_file_nodes_edges("keep.py", [node_keep], []) + + # Attempt to store new file but force a failure + node_fail = NodeInfo( + kind="File", name="fail", file_path="fail.py", + line_start=1, line_end=10, language="python" + ) + + with patch.object(store, 'upsert_node', side_effect=Exception("Simulated failure")): + with pytest.raises(Exception, match="Simulated failure"): + store.store_file_nodes_edges("fail.py", [node_fail], []) + + # Verify 'fail.py' data is NOT present + assert len(store.get_nodes_by_file("fail.py")) == 0 + # Verify 'keep.py' data IS still present + assert len(store.get_nodes_by_file("keep.py")) == 1 + + def test_public_rollback_api(self, store): + """Verify the new GraphStore.rollback() public method works.""" + store._conn.execute("BEGIN") + store._conn.execute("INSERT INTO metadata (key, value) VALUES ('rollback', 'me')") + assert store._conn.in_transaction + + store.rollback() + assert not store._conn.in_transaction + assert store.get_metadata("rollback") is None