diff --git a/pydgraph/__init__.py b/pydgraph/__init__.py index 19996fb..4314303 100755 --- a/pydgraph/__init__.py +++ b/pydgraph/__init__.py @@ -13,6 +13,7 @@ from pydgraph.client_stub import DgraphClientStub, client_stub from pydgraph.errors import ( AbortedError, + AbortReason, ConnectionError, # noqa: A004 RetriableError, TransactionError, @@ -44,6 +45,7 @@ from pydgraph.txn import Txn __all__ = [ + "AbortReason", "AbortedError", "AsyncDgraphClient", "AsyncDgraphClientStub", diff --git a/pydgraph/async_txn.py b/pydgraph/async_txn.py index a7cbd47..58dcb12 100644 --- a/pydgraph/async_txn.py +++ b/pydgraph/async_txn.py @@ -341,7 +341,7 @@ def _common_except_mutate(error: Exception) -> NoReturn: The original error otherwise """ if util.is_aborted_error(error): - raise errors.AbortedError + raise errors.AbortedError(util.abort_error_message(error)) if util.is_retriable_error(error): raise errors.RetriableError(error) @@ -437,7 +437,7 @@ def _common_except_commit(error: Exception) -> NoReturn: The original error otherwise """ if util.is_aborted_error(error): - raise errors.AbortedError + raise errors.AbortedError(util.abort_error_message(error)) raise error diff --git a/pydgraph/errors.py b/pydgraph/errors.py index 611a906..8447c00 100644 --- a/pydgraph/errors.py +++ b/pydgraph/errors.py @@ -5,6 +5,8 @@ from __future__ import annotations +import enum + from pydgraph.meta import VERSION __author__ = "Garvit Pahal" @@ -13,13 +15,63 @@ __status__ = "development" +class AbortReason(enum.Enum): + """The category of a transaction abort, as reported by the Dgraph server. + + The server encodes the reason as a ``": "`` prefix on the gRPC + ABORTED status message; :func:`parse_abort_reason` maps that prefix to one of + these values. + + - ``CONFLICT`` — a write-write conflict with another concurrent transaction; + retrying with a fresh transaction is the expected response. + - ``PREDICATE_MOVE`` — a predicate is being moved between groups and commits on + it are temporarily blocked; back off and retry once the move completes. + - ``STALE_STARTTS`` — the transaction's start timestamp predates the current Zero + leader (a leader change); retry with a fresh transaction. + - ``UNKNOWN`` — no reason was reported. Returned for aborts from older servers + that do not yet categorize the reason, so callers degrade gracefully. + """ + + CONFLICT = "conflict" + PREDICATE_MOVE = "predicate-move" + STALE_STARTTS = "stale-startts" + UNKNOWN = "unknown" + + +def parse_abort_reason(message: str | None) -> AbortReason: + """Parses the abort category from a server abort message. + + The reason is the ``": "`` prefix; matching is case-insensitive and + tolerant of surrounding whitespace. A message with no recognized prefix (e.g. from + a pre-feature server) returns :attr:`AbortReason.UNKNOWN`. + """ + if not message: + return AbortReason.UNKNOWN + code = message.split(":", 1)[0].strip().lower() + for reason in ( + AbortReason.CONFLICT, + AbortReason.PREDICATE_MOVE, + AbortReason.STALE_STARTTS, + ): + if code == reason.value: + return reason + return AbortReason.UNKNOWN + + class AbortedError(Exception): - """Error thrown by aborted transactions.""" + """Error thrown by aborted transactions. + + The parsed abort category is available as :attr:`reason`; the full server message + remains available via ``str(error)``. + """ def __init__( - self, message: str = "Transaction has been aborted. Please retry" + self, + message: str = "Transaction has been aborted. Please retry", + reason: AbortReason | None = None, ) -> None: super().__init__(message) + self.reason = reason if reason is not None else parse_abort_reason(message) class RetriableError(Exception): diff --git a/pydgraph/txn.py b/pydgraph/txn.py index 528be49..0e5a185 100644 --- a/pydgraph/txn.py +++ b/pydgraph/txn.py @@ -339,7 +339,7 @@ def create_request( @staticmethod def _common_except_mutate(error: Exception) -> None: if util.is_aborted_error(error): - raise errors.AbortedError + raise errors.AbortedError(util.abort_error_message(error)) if util.is_retriable_error(error): raise errors.RetriableError(error) @@ -399,7 +399,7 @@ def _common_commit(self) -> bool: @staticmethod def _common_except_commit(error: Exception) -> None: if util.is_aborted_error(error): - raise errors.AbortedError + raise errors.AbortedError(util.abort_error_message(error)) raise error diff --git a/pydgraph/util.py b/pydgraph/util.py index 6d6c41f..ec0231d 100644 --- a/pydgraph/util.py +++ b/pydgraph/util.py @@ -46,6 +46,20 @@ def is_aborted_error(error: Any) -> bool: return False +def abort_error_message(error: Any) -> str: + """Returns the server-supplied message from a gRPC error. + + Prefers ``error.details()`` (the status description, which carries the abort + reason prefix), falling back to ``str(error)``. Used to surface the categorized + abort reason on AbortedError. + """ + if hasattr(error, "details") and callable(error.details): + details = error.details() + if details: + return details + return str(error) + + def is_retriable_error(error: Exception) -> bool: """Returns true if the error is retriable (e.g server is not ready yet).""" msg = str(error) diff --git a/tests/docker-compose.multigroup.yml b/tests/docker-compose.multigroup.yml new file mode 100644 index 0000000..7b89593 --- /dev/null +++ b/tests/docker-compose.multigroup.yml @@ -0,0 +1,36 @@ +# Multi-group, no-ACL cluster for the live transaction-abort-reason tests +# (tests/test_abort_reason_live.py). Two alpha groups (replicas=1) enable the +# predicate-move case; the single zero is restartable for the stale-startts case. +# +# Usage: +# DGRAPH_IMAGE_TAG=local docker compose -f tests/docker-compose.multigroup.yml up -d +# TEST_SERVER_ADDR=localhost:9180 \ +# TEST_ZERO_HTTP=localhost:6180 \ +# TEST_ZERO_RESTART_CMD="docker compose -f tests/docker-compose.multigroup.yml restart zero1" \ +# python -m pytest tests/test_abort_reason_live.py -v +# docker compose -f tests/docker-compose.multigroup.yml down +services: + zero1: + image: dgraph/dgraph:${DGRAPH_IMAGE_TAG:-latest} + ports: + - "5180:5180" + - "6180:6180" + command: dgraph zero -o 100 --my=zero1:5180 --replicas=1 --logtostderr -v=2 --bindall + + alpha1: + image: dgraph/dgraph:${DGRAPH_IMAGE_TAG:-latest} + ports: + - "8180:8180" + - "9180:9180" + command: + dgraph alpha -o 100 --my=alpha1:7180 --zero=zero1:5180 --logtostderr -v=2 --raft "idx=1; + group=1" --security "whitelist=0.0.0.0/0;" + + alpha2: + image: dgraph/dgraph:${DGRAPH_IMAGE_TAG:-latest} + ports: + - "8182:8182" + - "9182:9182" + command: + dgraph alpha -o 102 --my=alpha2:7182 --zero=zero1:5180 --logtostderr -v=2 --raft "idx=2; + group=2" --security "whitelist=0.0.0.0/0;" diff --git a/tests/test_abort_reason.py b/tests/test_abort_reason.py new file mode 100644 index 0000000..dffda42 --- /dev/null +++ b/tests/test_abort_reason.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for surfacing the transaction-abort reason on AbortedError. + +The Dgraph server encodes the abort category as a ``": "`` prefix on the +gRPC ABORTED status message. These tests verify that the prefix is parsed into an +``AbortReason``, that the full message is preserved, and that aborts from a server which +reports no reason degrade gracefully to ``UNKNOWN``. +""" + +from __future__ import annotations + +import unittest + +import pydgraph +from pydgraph import errors + + +class TestAbortReason(unittest.TestCase): + # --- The three server-reported categories --- + + def test_conflict_reason(self) -> None: + err = errors.AbortedError("conflict: Transaction has been aborted. Please retry") + assert err.reason == pydgraph.AbortReason.CONFLICT + + def test_predicate_move_reason(self) -> None: + err = errors.AbortedError( + "predicate-move: Commits on predicate name are blocked due to predicate move" + ) + assert err.reason == pydgraph.AbortReason.PREDICATE_MOVE + + def test_stale_startts_reason(self) -> None: + err = errors.AbortedError( + "stale-startts: Transaction has been aborted due to a leader change. Please retry" + ) + assert err.reason == pydgraph.AbortReason.STALE_STARTTS + + # --- Full message preserved alongside the parsed reason --- + + def test_full_message_preserved(self) -> None: + desc = "conflict: Transaction has been aborted. Please retry" + err = errors.AbortedError(desc) + assert str(err) == desc + + # --- Graceful degradation against an older server (no reason prefix) --- + + def test_legacy_message_degrades_to_unknown(self) -> None: + # The default message (what older servers emit) has no category prefix. + err = errors.AbortedError() + assert err.reason == pydgraph.AbortReason.UNKNOWN + + def test_unrecognized_prefix_degrades_to_unknown(self) -> None: + err = errors.AbortedError("something-else: not a known category") + assert err.reason == pydgraph.AbortReason.UNKNOWN + + # --- Parsing robustness --- + + def test_reason_is_case_insensitive_and_trimmed(self) -> None: + assert errors.AbortedError("CONFLICT: x").reason == pydgraph.AbortReason.CONFLICT + assert ( + errors.AbortedError(" predicate-move : y").reason + == pydgraph.AbortReason.PREDICATE_MOVE + ) + + def test_reason_without_detail_still_parses(self) -> None: + assert errors.AbortedError("conflict").reason == pydgraph.AbortReason.CONFLICT + + def test_parse_abort_reason_none_is_unknown(self) -> None: + assert errors.parse_abort_reason(None) == pydgraph.AbortReason.UNKNOWN + assert errors.parse_abort_reason("") == pydgraph.AbortReason.UNKNOWN + + # --- Explicit reason overrides parsing --- + + def test_explicit_reason_overrides_message(self) -> None: + err = errors.AbortedError( + "opaque message", reason=pydgraph.AbortReason.STALE_STARTTS + ) + assert err.reason == pydgraph.AbortReason.STALE_STARTTS + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_abort_reason_live.py b/tests/test_abort_reason_live.py new file mode 100644 index 0000000..cd92b8f --- /dev/null +++ b/tests/test_abort_reason_live.py @@ -0,0 +1,167 @@ +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Live cross-language end-to-end tests for the transaction-abort reason. + +Unlike tests/test_abort_reason.py, which feeds synthetic messages into the parser, these +drive a real (locally patched) Dgraph cluster and prove that each abort category travels +all the way to AbortedError.reason: + +- conflict — two concurrent transactions write the same predicate/uid. +- stale-startts — a transaction's start ts is invalidated by a Zero leader change; we + force that by restarting Zero (TEST_ZERO_CONTAINER) mid-transaction. +- predicate-move — a predicate's tablet is moved to another group after a transaction + mutated it; the post-move commit is rejected. Requires a multi-group + cluster and the Zero admin HTTP endpoint (TEST_ZERO_HTTP). + +Each test skips cleanly when the infrastructure it needs is not configured, so the file is +safe to include in the default suite. Configure via env vars: + + TEST_SERVER_ADDR alpha gRPC (default localhost:9180) + TEST_ZERO_HTTP zero HTTP admin, e.g. localhost:6180 (enables predicate-move) + TEST_ZERO_CONTAINER docker/podman container name of Zero (enables stale-startts restart) + TEST_ZERO_RESTART_CMD shell command that restarts Zero (alternative to the container + name, e.g. for a manually-launched cluster) +""" + +from __future__ import annotations + +import json +import os +import subprocess +import time +import unittest +import urllib.request + +import pydgraph + +SERVER_ADDR = os.getenv("TEST_SERVER_ADDR", "localhost:9180") +ZERO_HTTP = os.getenv("TEST_ZERO_HTTP") +ZERO_CONTAINER = os.getenv("TEST_ZERO_CONTAINER") +ZERO_RESTART_CMD = os.getenv("TEST_ZERO_RESTART_CMD") +DOCKER = os.getenv("TEST_DOCKER_CMD", "docker") + + +def _restart_zero() -> None: + if ZERO_RESTART_CMD: + subprocess.run(ZERO_RESTART_CMD, shell=True, check=True, timeout=60) # noqa: S602 + else: + subprocess.run([DOCKER, "restart", ZERO_CONTAINER], check=True, timeout=60) # noqa: S603 + + +def _client() -> pydgraph.DgraphClient: + return pydgraph.DgraphClient(pydgraph.DgraphClientStub(SERVER_ADDR)) + + +def _zero_state() -> dict: + with urllib.request.urlopen(f"http://{ZERO_HTTP}/state", timeout=5) as resp: + return json.loads(resp.read().decode()) + + +def _find_tablet(pred: str) -> tuple[str, str] | None: + """Returns (group_id, tablet_key) for the given predicate, or None. + + Tablet keys are namespace-prefixed (e.g. "0-name"), so we match either the bare + predicate or a "-" key. + """ + for gid, group in _zero_state().get("groups", {}).items(): + for tablet in group.get("tablets") or {}: + if tablet == pred or tablet.endswith("-" + pred): + return gid, tablet + return None + + +def _group_of(pred: str) -> str | None: + found = _find_tablet(pred) + return found[0] if found else None + + +def _move_tablet(pred: str, group: str) -> None: + # moveTablet takes the bare predicate (namespace handled server-side), not the + # namespace-prefixed tablet key that /state reports. + url = f"http://{ZERO_HTTP}/moveTablet?tablet={pred}&group={group}" + with urllib.request.urlopen(url, timeout=30) as resp: # noqa: S310 + resp.read() + + +class TestAbortReasonLive(unittest.TestCase): + def setUp(self) -> None: + self.client = _client() + self.client.alter(pydgraph.Operation(drop_all=True)) + + def test_conflict_reports_conflict_reason(self) -> None: + txn = self.client.txn() + resp = txn.mutate(set_obj={"name": "Manish"}) + uid = next(iter(resp.uids.values())) + + txn2 = self.client.txn() + txn2.mutate(set_obj={"uid": uid, "name": "Manish"}) + + txn.commit() # winner + + with self.assertRaises(pydgraph.AbortedError) as ctx: + txn2.commit() + assert ctx.exception.reason == pydgraph.AbortReason.CONFLICT + assert "conflict" in str(ctx.exception) + + @unittest.skipUnless( + ZERO_CONTAINER or ZERO_RESTART_CMD, + "set TEST_ZERO_CONTAINER or TEST_ZERO_RESTART_CMD to restart Zero", + ) + def test_stale_startts_reports_stale_reason(self) -> None: + # Open a transaction so it gets a start ts that the restart will invalidate. + txn = self.client.txn() + txn.mutate(set_obj={"name": "Manish"}) + + # Restart Zero; on coming back it renews its lease and advances startTxnTs past the + # start ts above, making this transaction stale. Sleeps give the leader time to + # re-establish (lease renewal runs on becoming leader). + _restart_zero() + time.sleep(8) + + with self.assertRaises(pydgraph.AbortedError) as ctx: + txn.commit() + assert ctx.exception.reason == pydgraph.AbortReason.STALE_STARTTS + assert "stale-startts" in str(ctx.exception) + + @unittest.skipUnless( + ZERO_HTTP, "set TEST_ZERO_HTTP and run a multi-group cluster for predicate-move" + ) + def test_predicate_move_reports_predicate_move_reason(self) -> None: + self.client.alter(pydgraph.Operation(schema="name: string @index(exact) .")) + + # Seed so the "name" tablet exists and settles on some group. + seed = self.client.txn() + seed.mutate(set_obj={"name": "seed"}) + seed.commit() + time.sleep(1) + + found = _find_tablet("name") + groups = sorted(_zero_state().get("groups", {}).keys()) + if found is None or len(groups) < 2: + self.skipTest("need a multi-group cluster serving predicate 'name'") + src, _tablet_key = found + dst = next(g for g in groups if g != src) + + # Mutate "name" while it is on `src` (the txn's Preds will reference `src`), but + # do not commit yet. + txn = self.client.txn() + txn.mutate(set_obj={"name": "Manish"}) + + # Move the tablet to another group and wait for the move to complete. + _move_tablet("name", dst) + deadline = time.time() + 60 + while time.time() < deadline and _group_of("name") != dst: + time.sleep(1) + assert _group_of("name") == dst, "tablet move did not complete" + + # Committing now: the txn mutated on `src` but the tablet is on `dst`, so Zero's + # checkPreds rejects the commit with the predicate-move category. + with self.assertRaises(pydgraph.AbortedError) as ctx: + txn.commit() + assert ctx.exception.reason == pydgraph.AbortReason.PREDICATE_MOVE + assert "predicate-move" in str(ctx.exception) + + +if __name__ == "__main__": + unittest.main()