Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions code_review_graph/communities.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,36 +493,42 @@ def store_communities(
# that are tightly coupled to the DB transaction lifecycle.
conn = store._conn

# Clear existing data
conn.execute("DELETE FROM communities")
conn.execute("UPDATE nodes SET community_id = NULL")

count = 0
for comm in communities:
cursor = conn.execute(
"""INSERT INTO communities (name, level, cohesion, size, dominant_language, description)
VALUES (?, ?, ?, ?, ?, ?)""",
(
comm["name"],
comm.get("level", 0),
comm.get("cohesion", 0.0),
comm["size"],
comm.get("dominant_language", ""),
comm.get("description", ""),
),
)
community_id = cursor.lastrowid

# Update community_id on member nodes
member_qns = comm.get("members", [])
for qn in member_qns:
conn.execute(
"UPDATE nodes SET community_id = ? WHERE qualified_name = ?",
(community_id, qn),
# Wrap in explicit transaction so the DELETE + INSERT + UPDATE
# sequence is atomic — no partial community data on crash.
conn.execute("BEGIN IMMEDIATE")
try:
conn.execute("DELETE FROM communities")
conn.execute("UPDATE nodes SET community_id = NULL")

count = 0
for comm in communities:
cursor = conn.execute(
"""INSERT INTO communities (name, level, cohesion, size, dominant_language, description)
VALUES (?, ?, ?, ?, ?, ?)""",
(
comm["name"],
comm.get("level", 0),
comm.get("cohesion", 0.0),
comm["size"],
comm.get("dominant_language", ""),
comm.get("description", ""),
),
)
count += 1

conn.commit()
community_id = cursor.lastrowid

# Update community_id on member nodes
member_qns = comm.get("members", [])
for qn in member_qns:
conn.execute(
"UPDATE nodes SET community_id = ? WHERE qualified_name = ?",
(community_id, qn),
)
count += 1

conn.commit()
except BaseException:
conn.rollback()
raise
return count


Expand Down
5 changes: 4 additions & 1 deletion code_review_graph/embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,10 @@ def __init__(
self.provider = get_provider(provider, model=model)
self.available = self.provider is not None
self.db_path = Path(db_path)
self._conn = sqlite3.connect(str(self.db_path), timeout=30, check_same_thread=False)
self._conn = sqlite3.connect(
str(self.db_path), timeout=30, check_same_thread=False,
isolation_level=None,
)
self._conn.row_factory = sqlite3.Row
self._conn.executescript(_EMBEDDINGS_SCHEMA)

Expand Down
72 changes: 39 additions & 33 deletions code_review_graph/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,41 +314,47 @@ def store_flows(store: GraphStore, flows: list[dict]) -> int:
# tightly coupled to the DB transaction lifecycle.
conn = store._conn

# Clear old data.
conn.execute("DELETE FROM flow_memberships")
conn.execute("DELETE FROM flows")

count = 0
for flow in flows:
path_json = json.dumps(flow.get("path", []))
conn.execute(
"""INSERT INTO flows
(name, entry_point_id, depth, node_count, file_count,
criticality, path_json)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
flow["name"],
flow["entry_point_id"],
flow["depth"],
flow["node_count"],
flow["file_count"],
flow["criticality"],
path_json,
),
)
flow_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]

# Insert memberships.
node_ids = flow.get("path", [])
for position, node_id in enumerate(node_ids):
# Wrap the full DELETE + INSERT sequence in an explicit transaction
# so partial writes cannot occur if an exception interrupts the loop.
conn.execute("BEGIN IMMEDIATE")
try:
conn.execute("DELETE FROM flow_memberships")
conn.execute("DELETE FROM flows")

count = 0
for flow in flows:
path_json = json.dumps(flow.get("path", []))
conn.execute(
"INSERT OR IGNORE INTO flow_memberships (flow_id, node_id, position) "
"VALUES (?, ?, ?)",
(flow_id, node_id, position),
"""INSERT INTO flows
(name, entry_point_id, depth, node_count, file_count,
criticality, path_json)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
flow["name"],
flow["entry_point_id"],
flow["depth"],
flow["node_count"],
flow["file_count"],
flow["criticality"],
path_json,
),
)
count += 1

conn.commit()
flow_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]

# Insert memberships.
node_ids = flow.get("path", [])
for position, node_id in enumerate(node_ids):
conn.execute(
"INSERT OR IGNORE INTO flow_memberships (flow_id, node_id, position) "
"VALUES (?, ?, ?)",
(flow_id, node_id, position),
)
count += 1

conn.commit()
except BaseException:
conn.rollback()
raise
return count


Expand Down
16 changes: 15 additions & 1 deletion code_review_graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ def __init__(self, db_path: str | Path) -> None:
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(
str(self.db_path), timeout=30, check_same_thread=False
str(self.db_path), timeout=30, check_same_thread=False,
isolation_level=None, # Disable implicit transactions (#135)
)
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL")
Expand Down Expand Up @@ -236,6 +237,16 @@ def store_file_nodes_edges(
self, file_path: str, nodes: list[NodeInfo], edges: list[EdgeInfo], fhash: str = ""
) -> None:
"""Atomically replace all data for a file."""
# Defense-in-depth: flush any pending transaction before BEGIN
# IMMEDIATE. The root cause (implicit transactions from legacy
# isolation_level="") is fixed by setting isolation_level=None in
# __init__, but external code accessing _conn directly (e.g.
# _compute_summaries, flows.py, communities.py) could still leave
# a transaction open.
# See: https://github.com/tirth8205/code-review-graph/issues/135
if self._conn.in_transaction:
logger.warning("Flushing unexpected open transaction before BEGIN IMMEDIATE")
self._conn.commit()
self._conn.execute("BEGIN IMMEDIATE")
try:
self.remove_file_data(file_path)
Expand All @@ -262,6 +273,9 @@ def get_metadata(self, key: str) -> Optional[str]:
def commit(self) -> None:
self._conn.commit()

def rollback(self) -> None:
self._conn.rollback()

# --- Read operations ---

def get_node(self, qualified_name: str) -> Optional[GraphNode]:
Expand Down
14 changes: 13 additions & 1 deletion code_review_graph/incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,13 @@ def full_build(repo_root: Path, store: GraphStore) -> dict:
# Purge stale data from files no longer on disk
existing_files = set(store.get_all_files())
current_abs = {str(repo_root / f) for f in files}
for stale in existing_files - current_abs:
stale_files = existing_files - current_abs
for stale in stale_files:
store.remove_file_data(stale)
# Ensure deletions are persisted before store_file_nodes_edges()
# starts its own explicit transaction via BEGIN IMMEDIATE.
if stale_files:
store.commit()

total_nodes = 0
total_edges = 0
Expand Down Expand Up @@ -463,12 +468,14 @@ def incremental_update(

# Separate deleted/unparseable files from files that need re-parsing
to_parse: list[str] = []
removed_any = False
for rel_path in all_files:
if _should_ignore(rel_path, ignore_patterns):
continue
abs_path = repo_root / rel_path
if not abs_path.is_file():
store.remove_file_data(str(abs_path))
removed_any = True
continue
if parser.detect_language(abs_path) is None:
continue
Expand All @@ -483,6 +490,11 @@ def incremental_update(
pass
to_parse.append(rel_path)

# Persist deletions before store_file_nodes_edges() opens its own
# explicit transaction — avoids nested transaction errors.
if removed_any:
store.commit()

use_serial = os.environ.get("CRG_SERIAL_PARSE", "") == "1"

if use_serial or len(to_parse) < 8:
Expand Down
5 changes: 4 additions & 1 deletion code_review_graph/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ def get(self, db_path: str) -> sqlite3.Connection:
logger.debug("Failed to close evicted connection: %s", evict_key)
logger.debug("Evicted connection: %s", evict_key)

conn = sqlite3.connect(key, timeout=30, check_same_thread=False)
conn = sqlite3.connect(
key, timeout=30, check_same_thread=False,
isolation_level=None,
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
Expand Down
21 changes: 15 additions & 6 deletions code_review_graph/tools/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,19 @@ def _run_postprocess(


def _compute_summaries(store: Any) -> None:
"""Populate community_summaries, flow_snapshots, and risk_index tables."""
"""Populate community_summaries, flow_snapshots, and risk_index tables.

Each summary block (community_summaries, flow_snapshots, risk_index)
is wrapped in an explicit transaction so the DELETE + INSERT sequence
is atomic. If a table doesn't exist yet the block is silently skipped.
"""
import json as _json

conn = store._conn

# -- community_summaries --
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute("DELETE FROM community_summaries")
rows = conn.execute(
"SELECT id, name, size, dominant_language FROM communities"
Expand Down Expand Up @@ -168,11 +174,13 @@ def _compute_summaries(store: Any) -> None:
"VALUES (?, ?, ?, ?, ?, ?)",
(cid, cname, purpose, key_syms, csize, clang or ""),
)
conn.commit()
except sqlite3.OperationalError:
pass # Table may not exist yet
conn.rollback() # Table may not exist yet

# -- flow_snapshots --
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute("DELETE FROM flow_snapshots")
rows = conn.execute(
"SELECT id, name, entry_point_id, criticality, node_count, "
Expand Down Expand Up @@ -217,11 +225,13 @@ def _compute_summaries(store: Any) -> None:
(fid, fname, ep_name, _json.dumps(critical_path),
crit, ncount, fcount),
)
conn.commit()
except sqlite3.OperationalError:
pass
conn.rollback()

# -- risk_index --
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute("DELETE FROM risk_index")
# Per-node risk: caller_count, test coverage, security keywords
nodes = conn.execute(
Expand Down Expand Up @@ -266,10 +276,9 @@ def _compute_summaries(store: Any) -> None:
"VALUES (?, ?, ?, ?, ?, ?, datetime('now'))",
(nid, qn, risk, caller_count, coverage, sec_relevant),
)
conn.commit()
except sqlite3.OperationalError:
pass

conn.commit()
conn.rollback()


def build_or_update_graph(
Expand Down
45 changes: 45 additions & 0 deletions tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,51 @@ def test_store_file_nodes_edges(self):
result = self.store.get_nodes_by_file("/test/file.py")
assert len(result) == 2

def test_store_after_remove_no_transaction_error(self):
"""Regression test for #135: store_file_nodes_edges after
remove_file_data must not raise 'cannot start a transaction
within a transaction'.
"""
# Seed initial data for two files
nodes_a = [self._make_file_node("/test/a.py")]
nodes_b = [self._make_file_node("/test/b.py")]
self.store.store_file_nodes_edges("/test/a.py", nodes_a, [])
self.store.store_file_nodes_edges("/test/b.py", nodes_b, [])

# Without the isolation_level=None fix, this would leave an
# implicit transaction open and the next call would crash.
self.store.remove_file_data("/test/a.py")
# Must not raise sqlite3.OperationalError
nodes_c = [self._make_file_node("/test/c.py")]
self.store.store_file_nodes_edges("/test/c.py", nodes_c, [])

assert self.store.get_node("/test/a.py") is None
assert self.store.get_node("/test/c.py") is not None

def test_store_after_multiple_removes_no_transaction_error(self):
"""Regression test for #181: full_build stale-file purge leaves
implicit transaction open after multiple remove_file_data calls.
"""
# Seed data for several files
for i in range(5):
path = f"/test/file_{i}.py"
self.store.store_file_nodes_edges(
path, [self._make_file_node(path)], [],
)

# Simulates full_build's stale-file purge: multiple deletes in a
# row without explicit commit between them.
for i in range(3):
self.store.remove_file_data(f"/test/file_{i}.py")

# Next store call must succeed regardless of prior connection state.
new_path = "/test/new_file.py"
nodes = [self._make_file_node(new_path)]
self.store.store_file_nodes_edges(new_path, nodes, [])

assert self.store.get_node(new_path) is not None
assert self.store.get_node("/test/file_0.py") is None

def test_search_nodes(self):
self.store.upsert_node(self._make_func_node("authenticate"))
self.store.upsert_node(self._make_func_node("authorize"))
Expand Down
4 changes: 2 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.