Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion code_review_graph/communities.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ def store_communities(
# that are tightly coupled to the DB transaction lifecycle.
conn = store._conn

if conn.in_transaction:
logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE")
conn.rollback()
# Wrap in explicit transaction so the DELETE + INSERT + UPDATE
# sequence is atomic — no partial community data on crash.
conn.execute("BEGIN IMMEDIATE")
Expand All @@ -561,7 +564,8 @@ def store_communities(
count = 0
for comm in communities:
cursor = conn.execute(
"""INSERT INTO communities (name, level, cohesion, size, dominant_language, description)
"""INSERT INTO communities
(name, level, cohesion, size, dominant_language, description)
VALUES (?, ?, ?, ?, ?, ?)""",
(
comm["name"],
Expand Down
3 changes: 3 additions & 0 deletions code_review_graph/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ def store_flows(store: GraphStore, flows: list[dict]) -> int:
# tightly coupled to the DB transaction lifecycle.
conn = store._conn

if conn.in_transaction:
logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE")
conn.rollback()
# Wrap the full DELETE + INSERT sequence in an explicit transaction
# so partial writes cannot occur if an exception interrupts the loop.
conn.execute("BEGIN IMMEDIATE")
Expand Down
12 changes: 3 additions & 9 deletions code_review_graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,9 @@ def store_file_nodes_edges(
self, file_path: str, nodes: list[NodeInfo], edges: list[EdgeInfo], fhash: str = ""
) -> None:
"""Atomically replace all data for a file."""
# Defense-in-depth: flush any pending transaction before BEGIN
# IMMEDIATE. The root cause (implicit transactions from legacy
# isolation_level="") is fixed by setting isolation_level=None in
# __init__, but external code accessing _conn directly (e.g.
# _compute_summaries, flows.py, communities.py) could still leave
# a transaction open.
# See: https://github.com/tirth8205/code-review-graph/issues/135
if self._conn.in_transaction:
logger.warning("Flushing unexpected open transaction before BEGIN IMMEDIATE")
self._conn.commit()
logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE")
self._conn.rollback()
self._conn.execute("BEGIN IMMEDIATE")
try:
self.remove_file_data(file_path)
Expand Down Expand Up @@ -274,6 +267,7 @@ def commit(self) -> None:
self._conn.commit()

def rollback(self) -> None:
"""Rollback the current transaction."""
self._conn.rollback()

# --- Read operations ---
Expand Down
39 changes: 22 additions & 17 deletions code_review_graph/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,28 @@ def rebuild_fts_index(store: GraphStore) -> int:
# the FTS5 virtual table DDL, which is tightly coupled to SQLite internals.
conn = store._conn

# Drop and recreate the FTS table to avoid content-sync mismatch issues
conn.execute("DROP TABLE IF EXISTS nodes_fts")
conn.execute("""
CREATE VIRTUAL TABLE nodes_fts USING fts5(
name, qualified_name, file_path, signature,
tokenize='porter unicode61'
)
""")
conn.commit()

# Populate from nodes table
conn.execute("""
INSERT INTO nodes_fts(rowid, name, qualified_name, file_path, signature)
SELECT id, name, qualified_name, file_path, COALESCE(signature, '')
FROM nodes
""")
conn.commit()
if conn.in_transaction:
logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE")
conn.rollback()
conn.execute("BEGIN IMMEDIATE")
try:
# Drop and recreate the FTS table with content sync to match migration v5
conn.execute("DROP TABLE IF EXISTS nodes_fts")
conn.execute("""
CREATE VIRTUAL TABLE nodes_fts USING fts5(
name, qualified_name, file_path, signature,
content='nodes', content_rowid='rowid',
tokenize='porter unicode61'
)
""")

# Rebuild from the content table (nodes) using the FTS5 rebuild command
conn.execute("INSERT INTO nodes_fts(nodes_fts) VALUES('rebuild')")

conn.commit()
except BaseException:
conn.rollback()
raise

count = conn.execute("SELECT count(*) FROM nodes_fts").fetchone()[0]
logger.info("FTS index rebuilt: %d rows indexed", count)
Expand Down
3 changes: 3 additions & 0 deletions code_review_graph/tools/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ def run_postprocess(
fts_count = rebuild_fts_index(store)
result["fts_indexed"] = fts_count
except (sqlite3.OperationalError, ImportError) as e:
store.rollback()
logger.warning("FTS index rebuild failed: %s", e)
warnings.append(f"FTS index rebuild failed: {type(e).__name__}: {e}")

Expand All @@ -419,6 +420,7 @@ def run_postprocess(
count = _store_flows(store, traced)
result["flows_detected"] = count
except (sqlite3.OperationalError, ImportError) as e:
store.rollback()
logger.warning("Flow detection failed: %s", e)
warnings.append(f"Flow detection failed: {type(e).__name__}: {e}")

Expand All @@ -435,6 +437,7 @@ def run_postprocess(
count = _store_communities(store, comms)
result["communities_detected"] = count
except (sqlite3.OperationalError, ImportError) as e:
store.rollback()
logger.warning("Community detection failed: %s", e)
warnings.append(f"Community detection failed: {type(e).__name__}: {e}")

Expand Down
75 changes: 75 additions & 0 deletions tests/test_fts_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Tests for FTS5 content sync robustness."""

import sqlite3
import tempfile
from pathlib import Path

import pytest

from code_review_graph.graph import GraphStore
from code_review_graph.parser import NodeInfo
from code_review_graph.search import rebuild_fts_index

@pytest.fixture
def store():
"""Create a temporary GraphStore for testing."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
db_path = tmp.name
store = GraphStore(db_path)
yield store
store.close()
Path(db_path).unlink(missing_ok=True)

class TestFTSSync:
def test_fts_rebuild_syncs_with_nodes(self, store):
"""Test that rebuild_fts_index properly populates from nodes table."""
# 1. Add some nodes
node1 = NodeInfo(
kind="Function", name="calculate_total", file_path="app.py",
line_start=1, line_end=5, language="python"
)
node2 = NodeInfo(
kind="Class", name="OrderProcessor", file_path="app.py",
line_start=10, line_end=50, language="python"
)
store.store_file_nodes_edges("app.py", [node1, node2], [])

# 2. Rebuild FTS
count = rebuild_fts_index(store)
assert count == 2

# 3. Verify FTS content via search
# We query the virtual table directly to ensure it has the data
fts_rows = store._conn.execute(
"SELECT name FROM nodes_fts WHERE name MATCH 'calculate*'"
).fetchall()
assert len(fts_rows) == 1
assert fts_rows[0]["name"] == "calculate_total"

def test_fts_rebuild_clears_old_data(self, store):
"""Test that rebuild_fts_index clears existing FTS data before repopulating."""
# 1. Add and index one node
node1 = NodeInfo(
kind="Function", name="old_func", file_path="old.py",
line_start=1, line_end=5, language="python"
)
store.store_file_nodes_edges("old.py", [node1], [])
rebuild_fts_index(store)

# 2. Delete the file/nodes
store.remove_file_data("old.py")
store.commit()

# 3. Add a new node
node2 = NodeInfo(
kind="Function", name="new_func", file_path="new.py",
line_start=1, line_end=5, language="python"
)
store.store_file_nodes_edges("new.py", [node2], [])

# 4. Rebuild FTS - should ONLY have new_func
rebuild_fts_index(store)

fts_rows = store._conn.execute("SELECT name FROM nodes_fts").fetchall()
assert len(fts_rows) == 1
assert fts_rows[0]["name"] == "new_func"
116 changes: 116 additions & 0 deletions tests/test_transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Tests for SQLite transaction robustness and nesting scenarios."""

import sqlite3
import tempfile
import logging
from pathlib import Path
from unittest.mock import patch

import pytest

from code_review_graph.graph import GraphStore
from code_review_graph.parser import NodeInfo, EdgeInfo
from code_review_graph.communities import store_communities
from code_review_graph.flows import store_flows

@pytest.fixture
def store():
"""Create a temporary GraphStore for testing."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
db_path = tmp.name
store = GraphStore(db_path)
yield store
store.close()
Path(db_path).unlink(missing_ok=True)

class TestTransactionRobustness:
def test_nested_transaction_guard_in_store_file(self, store, caplog):
"""Test that store_file_nodes_edges handles an already open transaction."""
# Manually open a transaction
store._conn.execute("BEGIN")
store._conn.execute("INSERT INTO metadata (key, value) VALUES (?, ?)", ("test", "val"))
assert store._conn.in_transaction

# This should trigger the guard, rollback the uncommitted insert, and start a new transaction
with caplog.at_level(logging.WARNING):
store.store_file_nodes_edges("test.py", [], [])

assert "Rolling back uncommitted transaction before BEGIN IMMEDIATE" in caplog.text
assert not store._conn.in_transaction

# Verify the "val" was rolled back
assert store.get_metadata("test") is None

def test_atomic_community_storage(self, store):
"""Test that store_communities is atomic and handles existing transactions."""
communities = [
{"name": "comm1", "size": 1, "members": ["node1"]}
]

# Leave a transaction open
store._conn.execute("BEGIN")
store._conn.execute("INSERT INTO metadata (key, value) VALUES ('leak', 'stale')")

# Should rollback the 'leak' and successfully store communities
store_communities(store, communities)

assert store.get_metadata("leak") is None

# Verify communities table
count = store._conn.execute("SELECT count(*) FROM communities").fetchone()[0]
assert count == 1

def test_atomic_flow_storage(self, store):
"""Test that store_flows is atomic and handles existing transactions."""
flows = [
{
"name": "flow1", "entry_point_id": 1, "depth": 1,
"node_count": 1, "file_count": 1, "criticality": 0.5,
"path": [1]
}
]

# Leave a transaction open
store._conn.execute("BEGIN")
store._conn.execute("INSERT INTO metadata (key, value) VALUES ('leak', 'stale')")

# Should rollback and store flows
store_flows(store, flows)

assert store.get_metadata("leak") is None
count = store._conn.execute("SELECT count(*) FROM flows").fetchone()[0]
assert count == 1

def test_rollback_on_failure_in_batch_ops(self, store):
"""Verify that store_file_nodes_edges rolls back if an operation fails inside."""
# Pre-seed some data
node_keep = NodeInfo(
kind="File", name="keep", file_path="keep.py",
line_start=1, line_end=10, language="python"
)
store.store_file_nodes_edges("keep.py", [node_keep], [])

# Attempt to store new file but force a failure
node_fail = NodeInfo(
kind="File", name="fail", file_path="fail.py",
line_start=1, line_end=10, language="python"
)

with patch.object(store, 'upsert_node', side_effect=Exception("Simulated failure")):
with pytest.raises(Exception, match="Simulated failure"):
store.store_file_nodes_edges("fail.py", [node_fail], [])

# Verify 'fail.py' data is NOT present
assert len(store.get_nodes_by_file("fail.py")) == 0
# Verify 'keep.py' data IS still present
assert len(store.get_nodes_by_file("keep.py")) == 1

def test_public_rollback_api(self, store):
"""Verify the new GraphStore.rollback() public method works."""
store._conn.execute("BEGIN")
store._conn.execute("INSERT INTO metadata (key, value) VALUES ('rollback', 'me')")
assert store._conn.in_transaction

store.rollback()
assert not store._conn.in_transaction
assert store.get_metadata("rollback") is None