diff --git a/code_review_graph/communities.py b/code_review_graph/communities.py index dfd7a97c..67ec3b70 100644 --- a/code_review_graph/communities.py +++ b/code_review_graph/communities.py @@ -493,36 +493,42 @@ def store_communities( # that are tightly coupled to the DB transaction lifecycle. conn = store._conn - # Clear existing data - conn.execute("DELETE FROM communities") - conn.execute("UPDATE nodes SET community_id = NULL") - - count = 0 - for comm in communities: - cursor = conn.execute( - """INSERT INTO communities (name, level, cohesion, size, dominant_language, description) - VALUES (?, ?, ?, ?, ?, ?)""", - ( - comm["name"], - comm.get("level", 0), - comm.get("cohesion", 0.0), - comm["size"], - comm.get("dominant_language", ""), - comm.get("description", ""), - ), - ) - community_id = cursor.lastrowid - - # Update community_id on member nodes - member_qns = comm.get("members", []) - for qn in member_qns: - conn.execute( - "UPDATE nodes SET community_id = ? WHERE qualified_name = ?", - (community_id, qn), + # Wrap in explicit transaction so the DELETE + INSERT + UPDATE + # sequence is atomic — no partial community data on crash. + conn.execute("BEGIN IMMEDIATE") + try: + conn.execute("DELETE FROM communities") + conn.execute("UPDATE nodes SET community_id = NULL") + + count = 0 + for comm in communities: + cursor = conn.execute( + """INSERT INTO communities (name, level, cohesion, size, dominant_language, description) + VALUES (?, ?, ?, ?, ?, ?)""", + ( + comm["name"], + comm.get("level", 0), + comm.get("cohesion", 0.0), + comm["size"], + comm.get("dominant_language", ""), + comm.get("description", ""), + ), ) - count += 1 - - conn.commit() + community_id = cursor.lastrowid + + # Update community_id on member nodes + member_qns = comm.get("members", []) + for qn in member_qns: + conn.execute( + "UPDATE nodes SET community_id = ? WHERE qualified_name = ?", + (community_id, qn), + ) + count += 1 + + conn.commit() + except BaseException: + conn.rollback() + raise return count diff --git a/code_review_graph/embeddings.py b/code_review_graph/embeddings.py index 7324c6ca..fcac3af2 100644 --- a/code_review_graph/embeddings.py +++ b/code_review_graph/embeddings.py @@ -366,7 +366,10 @@ def __init__( self.provider = get_provider(provider, model=model) self.available = self.provider is not None self.db_path = Path(db_path) - self._conn = sqlite3.connect(str(self.db_path), timeout=30, check_same_thread=False) + self._conn = sqlite3.connect( + str(self.db_path), timeout=30, check_same_thread=False, + isolation_level=None, + ) self._conn.row_factory = sqlite3.Row self._conn.executescript(_EMBEDDINGS_SCHEMA) diff --git a/code_review_graph/flows.py b/code_review_graph/flows.py index fbc08f21..1cfb4964 100644 --- a/code_review_graph/flows.py +++ b/code_review_graph/flows.py @@ -314,41 +314,47 @@ def store_flows(store: GraphStore, flows: list[dict]) -> int: # tightly coupled to the DB transaction lifecycle. conn = store._conn - # Clear old data. - conn.execute("DELETE FROM flow_memberships") - conn.execute("DELETE FROM flows") - - count = 0 - for flow in flows: - path_json = json.dumps(flow.get("path", [])) - conn.execute( - """INSERT INTO flows - (name, entry_point_id, depth, node_count, file_count, - criticality, path_json) - VALUES (?, ?, ?, ?, ?, ?, ?)""", - ( - flow["name"], - flow["entry_point_id"], - flow["depth"], - flow["node_count"], - flow["file_count"], - flow["criticality"], - path_json, - ), - ) - flow_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] - - # Insert memberships. - node_ids = flow.get("path", []) - for position, node_id in enumerate(node_ids): + # Wrap the full DELETE + INSERT sequence in an explicit transaction + # so partial writes cannot occur if an exception interrupts the loop. + conn.execute("BEGIN IMMEDIATE") + try: + conn.execute("DELETE FROM flow_memberships") + conn.execute("DELETE FROM flows") + + count = 0 + for flow in flows: + path_json = json.dumps(flow.get("path", [])) conn.execute( - "INSERT OR IGNORE INTO flow_memberships (flow_id, node_id, position) " - "VALUES (?, ?, ?)", - (flow_id, node_id, position), + """INSERT INTO flows + (name, entry_point_id, depth, node_count, file_count, + criticality, path_json) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + ( + flow["name"], + flow["entry_point_id"], + flow["depth"], + flow["node_count"], + flow["file_count"], + flow["criticality"], + path_json, + ), ) - count += 1 - - conn.commit() + flow_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] + + # Insert memberships. + node_ids = flow.get("path", []) + for position, node_id in enumerate(node_ids): + conn.execute( + "INSERT OR IGNORE INTO flow_memberships (flow_id, node_id, position) " + "VALUES (?, ?, ?)", + (flow_id, node_id, position), + ) + count += 1 + + conn.commit() + except BaseException: + conn.rollback() + raise return count diff --git a/code_review_graph/graph.py b/code_review_graph/graph.py index 2dfa97fc..bccb1e06 100644 --- a/code_review_graph/graph.py +++ b/code_review_graph/graph.py @@ -126,7 +126,8 @@ def __init__(self, db_path: str | Path) -> None: self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._conn = sqlite3.connect( - str(self.db_path), timeout=30, check_same_thread=False + str(self.db_path), timeout=30, check_same_thread=False, + isolation_level=None, # Disable implicit transactions (#135) ) self._conn.row_factory = sqlite3.Row self._conn.execute("PRAGMA journal_mode=WAL") @@ -236,6 +237,16 @@ def store_file_nodes_edges( self, file_path: str, nodes: list[NodeInfo], edges: list[EdgeInfo], fhash: str = "" ) -> None: """Atomically replace all data for a file.""" + # Defense-in-depth: flush any pending transaction before BEGIN + # IMMEDIATE. The root cause (implicit transactions from legacy + # isolation_level="") is fixed by setting isolation_level=None in + # __init__, but external code accessing _conn directly (e.g. + # _compute_summaries, flows.py, communities.py) could still leave + # a transaction open. + # See: https://github.com/tirth8205/code-review-graph/issues/135 + if self._conn.in_transaction: + logger.warning("Flushing unexpected open transaction before BEGIN IMMEDIATE") + self._conn.commit() self._conn.execute("BEGIN IMMEDIATE") try: self.remove_file_data(file_path) @@ -262,6 +273,9 @@ def get_metadata(self, key: str) -> Optional[str]: def commit(self) -> None: self._conn.commit() + def rollback(self) -> None: + self._conn.rollback() + # --- Read operations --- def get_node(self, qualified_name: str) -> Optional[GraphNode]: diff --git a/code_review_graph/incremental.py b/code_review_graph/incremental.py index 863211bb..5869151a 100644 --- a/code_review_graph/incremental.py +++ b/code_review_graph/incremental.py @@ -352,8 +352,13 @@ def full_build(repo_root: Path, store: GraphStore) -> dict: # Purge stale data from files no longer on disk existing_files = set(store.get_all_files()) current_abs = {str(repo_root / f) for f in files} - for stale in existing_files - current_abs: + stale_files = existing_files - current_abs + for stale in stale_files: store.remove_file_data(stale) + # Ensure deletions are persisted before store_file_nodes_edges() + # starts its own explicit transaction via BEGIN IMMEDIATE. + if stale_files: + store.commit() total_nodes = 0 total_edges = 0 @@ -463,12 +468,14 @@ def incremental_update( # Separate deleted/unparseable files from files that need re-parsing to_parse: list[str] = [] + removed_any = False for rel_path in all_files: if _should_ignore(rel_path, ignore_patterns): continue abs_path = repo_root / rel_path if not abs_path.is_file(): store.remove_file_data(str(abs_path)) + removed_any = True continue if parser.detect_language(abs_path) is None: continue @@ -483,6 +490,11 @@ def incremental_update( pass to_parse.append(rel_path) + # Persist deletions before store_file_nodes_edges() opens its own + # explicit transaction — avoids nested transaction errors. + if removed_any: + store.commit() + use_serial = os.environ.get("CRG_SERIAL_PARSE", "") == "1" if use_serial or len(to_parse) < 8: diff --git a/code_review_graph/registry.py b/code_review_graph/registry.py index 5572ae9b..1ecb1f6a 100644 --- a/code_review_graph/registry.py +++ b/code_review_graph/registry.py @@ -196,7 +196,10 @@ def get(self, db_path: str) -> sqlite3.Connection: logger.debug("Failed to close evicted connection: %s", evict_key) logger.debug("Evicted connection: %s", evict_key) - conn = sqlite3.connect(key, timeout=30, check_same_thread=False) + conn = sqlite3.connect( + key, timeout=30, check_same_thread=False, + isolation_level=None, + ) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=5000") diff --git a/code_review_graph/tools/build.py b/code_review_graph/tools/build.py index a8a7bd0b..7ddf3fe4 100644 --- a/code_review_graph/tools/build.py +++ b/code_review_graph/tools/build.py @@ -126,13 +126,19 @@ def _run_postprocess( def _compute_summaries(store: Any) -> None: - """Populate community_summaries, flow_snapshots, and risk_index tables.""" + """Populate community_summaries, flow_snapshots, and risk_index tables. + + Each summary block (community_summaries, flow_snapshots, risk_index) + is wrapped in an explicit transaction so the DELETE + INSERT sequence + is atomic. If a table doesn't exist yet the block is silently skipped. + """ import json as _json conn = store._conn # -- community_summaries -- try: + conn.execute("BEGIN IMMEDIATE") conn.execute("DELETE FROM community_summaries") rows = conn.execute( "SELECT id, name, size, dominant_language FROM communities" @@ -168,11 +174,13 @@ def _compute_summaries(store: Any) -> None: "VALUES (?, ?, ?, ?, ?, ?)", (cid, cname, purpose, key_syms, csize, clang or ""), ) + conn.commit() except sqlite3.OperationalError: - pass # Table may not exist yet + conn.rollback() # Table may not exist yet # -- flow_snapshots -- try: + conn.execute("BEGIN IMMEDIATE") conn.execute("DELETE FROM flow_snapshots") rows = conn.execute( "SELECT id, name, entry_point_id, criticality, node_count, " @@ -217,11 +225,13 @@ def _compute_summaries(store: Any) -> None: (fid, fname, ep_name, _json.dumps(critical_path), crit, ncount, fcount), ) + conn.commit() except sqlite3.OperationalError: - pass + conn.rollback() # -- risk_index -- try: + conn.execute("BEGIN IMMEDIATE") conn.execute("DELETE FROM risk_index") # Per-node risk: caller_count, test coverage, security keywords nodes = conn.execute( @@ -266,10 +276,9 @@ def _compute_summaries(store: Any) -> None: "VALUES (?, ?, ?, ?, ?, ?, datetime('now'))", (nid, qn, risk, caller_count, coverage, sec_relevant), ) + conn.commit() except sqlite3.OperationalError: - pass - - conn.commit() + conn.rollback() def build_or_update_graph( diff --git a/tests/test_graph.py b/tests/test_graph.py index 5923f578..e46394e6 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -107,6 +107,51 @@ def test_store_file_nodes_edges(self): result = self.store.get_nodes_by_file("/test/file.py") assert len(result) == 2 + def test_store_after_remove_no_transaction_error(self): + """Regression test for #135: store_file_nodes_edges after + remove_file_data must not raise 'cannot start a transaction + within a transaction'. + """ + # Seed initial data for two files + nodes_a = [self._make_file_node("/test/a.py")] + nodes_b = [self._make_file_node("/test/b.py")] + self.store.store_file_nodes_edges("/test/a.py", nodes_a, []) + self.store.store_file_nodes_edges("/test/b.py", nodes_b, []) + + # Without the isolation_level=None fix, this would leave an + # implicit transaction open and the next call would crash. + self.store.remove_file_data("/test/a.py") + # Must not raise sqlite3.OperationalError + nodes_c = [self._make_file_node("/test/c.py")] + self.store.store_file_nodes_edges("/test/c.py", nodes_c, []) + + assert self.store.get_node("/test/a.py") is None + assert self.store.get_node("/test/c.py") is not None + + def test_store_after_multiple_removes_no_transaction_error(self): + """Regression test for #181: full_build stale-file purge leaves + implicit transaction open after multiple remove_file_data calls. + """ + # Seed data for several files + for i in range(5): + path = f"/test/file_{i}.py" + self.store.store_file_nodes_edges( + path, [self._make_file_node(path)], [], + ) + + # Simulates full_build's stale-file purge: multiple deletes in a + # row without explicit commit between them. + for i in range(3): + self.store.remove_file_data(f"/test/file_{i}.py") + + # Next store call must succeed regardless of prior connection state. + new_path = "/test/new_file.py" + nodes = [self._make_file_node(new_path)] + self.store.store_file_nodes_edges(new_path, nodes, []) + + assert self.store.get_node(new_path) is not None + assert self.store.get_node("/test/file_0.py") is None + def test_search_nodes(self): self.store.upsert_node(self._make_func_node("authenticate")) self.store.upsert_node(self._make_func_node("authorize")) diff --git a/uv.lock b/uv.lock index cac70c02..8ba1a80e 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.14'", @@ -260,7 +260,7 @@ wheels = [ [[package]] name = "code-review-graph" -version = "2.0.0" +version = "2.2.2" source = { editable = "." } dependencies = [ { name = "fastmcp" },