diff --git a/docker-compose.abort-reason.yml b/docker-compose.abort-reason.yml new file mode 100644 index 0000000..010437a --- /dev/null +++ b/docker-compose.abort-reason.yml @@ -0,0 +1,41 @@ +# Multi-group, no-ACL cluster for the live transaction-abort-reason tests +# (io.dgraph.AbortReasonLiveTest). Two alpha groups (replicas=1) enable the +# predicate-move case; the single zero is restartable for the stale-startts case. +# +# Usage: +# make local-image # build dgraph/dgraph:local from this branch +# docker compose -f docker-compose.abort-reason.yml up -d +# TEST_GRPC_PORT=9180 \ +# TEST_ZERO_HTTP=localhost:6180 \ +# TEST_ZERO_RESTART_CMD="docker compose -f docker-compose.abort-reason.yml restart zero1" \ +# ./gradlew test --tests io.dgraph.AbortReasonLiveTest +# docker compose -f docker-compose.abort-reason.yml down +version: "3.5" +services: + zero1: + image: dgraph/dgraph:local + container_name: ar_zero1 + ports: + - 5180:5180 + - 6180:6180 + command: dgraph zero -o 100 --my=zero1:5180 --replicas=1 --logtostderr -v=2 --bindall + + alpha1: + image: dgraph/dgraph:local + container_name: ar_alpha1 + ports: + - 8180:8180 + - 9180:9180 + command: + dgraph alpha -o 100 --my=alpha1:7180 --zero=zero1:5180 --logtostderr -v=2 --raft "idx=1; + group=1" --security "whitelist=0.0.0.0/0;" + + alpha2: + image: dgraph/dgraph:local + container_name: ar_alpha2 + ports: + - 8182:8182 + - 9182:9182 + command: + dgraph alpha -o 102 --my=alpha2:7182 --zero=zero1:5180 --logtostderr -v=2 --raft "idx=2; + group=2" --security "whitelist=0.0.0.0/0;" diff --git a/src/main/java/io/dgraph/TxnConflictException.java b/src/main/java/io/dgraph/TxnConflictException.java index f6a4e66..b963ba6 100644 --- a/src/main/java/io/dgraph/TxnConflictException.java +++ b/src/main/java/io/dgraph/TxnConflictException.java @@ -15,6 +15,27 @@ public class TxnConflictException extends TxnException { private static final long serialVersionUID = 1L; + /** + * The category of a transaction abort, as reported by the Dgraph server. + * + *
: "}
+ * prefix on the gRPC status description; this method parses that prefix. Against a server that does
+ * not report a reason (older versions), this returns {@link AbortReason#UNKNOWN}. The full
+ * human-readable description remains available via {@link #getMessage()}.
+ */
+ public AbortReason getReason() {
+ String desc = getStatus().getDescription();
+ if (desc == null) {
+ return AbortReason.UNKNOWN;
+ }
+ int colon = desc.indexOf(':');
+ String code = (colon >= 0 ? desc.substring(0, colon) : desc).trim().toLowerCase();
+ switch (code) {
+ case "conflict":
+ return AbortReason.CONFLICT;
+ case "predicate-move":
+ return AbortReason.PREDICATE_MOVE;
+ case "stale-startts":
+ return AbortReason.STALE_STARTTS;
+ default:
+ return AbortReason.UNKNOWN;
+ }
+ }
+
@Override
public boolean isRetryable() {
return true;
diff --git a/src/test/java/io/dgraph/AbortReasonLiveTest.java b/src/test/java/io/dgraph/AbortReasonLiveTest.java
new file mode 100644
index 0000000..2a85334
--- /dev/null
+++ b/src/test/java/io/dgraph/AbortReasonLiveTest.java
@@ -0,0 +1,272 @@
+/*
+ * SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.dgraph;
+
+import static org.testng.Assert.*;
+
+import com.google.gson.Gson;
+import com.google.protobuf.ByteString;
+import io.dgraph.DgraphProto.Mutation;
+import io.dgraph.DgraphProto.Operation;
+import io.dgraph.DgraphProto.Response;
+import io.dgraph.TxnConflictException.AbortReason;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.testng.SkipException;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Live cross-language end-to-end proof for the transaction-abort reason. Unlike {@link
+ * AbortReasonTest}, which feeds synthetic gRPC statuses into the parser, this test drives a real
+ * (locally patched) Dgraph server and asserts that each abort category propagates all the way to
+ * {@link TxnConflictException#getReason()}. This closes the loop the unit tests cannot — proving the
+ * server actually emits the categorized reason on the wire and the client parses it.
+ *
+ * Configuration (system properties or environment variables):
+ *
+ *
+ * - {@code dgraph.test.host} / {@code TEST_HOSTNAME} — alpha host (default {@code localhost})
+ *
- {@code dgraph.test.port} / {@code TEST_GRPC_PORT} — alpha gRPC port (default {@code 9180})
+ *
- {@code dgraph.test.zeroHttp} / {@code TEST_ZERO_HTTP} — zero HTTP admin (e.g. {@code
+ * localhost:6180}); enables the predicate-move test (needs a multi-group cluster)
+ *
- {@code dgraph.test.zeroRestartCmd} / {@code TEST_ZERO_RESTART_CMD} — shell command that
+ * restarts Zero; enables the stale-startts test
+ *
+ *
+ * Tests whose infrastructure is not configured are skipped (not failed), so the file is safe in the
+ * default run. Run explicitly with {@code --tests io.dgraph.AbortReasonLiveTest}.
+ */
+public class AbortReasonLiveTest {
+ private static final String HOST = conf("dgraph.test.host", "TEST_HOSTNAME", "localhost");
+ private static final int PORT =
+ Integer.parseInt(conf("dgraph.test.port", "TEST_GRPC_PORT", "9180"));
+ private static final String ZERO_HTTP = conf("dgraph.test.zeroHttp", "TEST_ZERO_HTTP", null);
+ private static final String ZERO_RESTART_CMD =
+ conf("dgraph.test.zeroRestartCmd", "TEST_ZERO_RESTART_CMD", null);
+
+ private static ManagedChannel channel;
+ private static DgraphClient client;
+
+ private static String conf(String prop, String env, String dflt) {
+ String v = System.getProperty(prop);
+ if (v == null || v.isEmpty()) {
+ v = System.getenv(env);
+ }
+ return (v == null || v.isEmpty()) ? dflt : v;
+ }
+
+ @BeforeClass
+ public static void before() {
+ channel = ManagedChannelBuilder.forAddress(HOST, PORT).usePlaintext().build();
+ client = new DgraphClient(DgraphGrpc.newStub(channel));
+ client.alter(Operation.newBuilder().setDropAll(true).build());
+ }
+
+ @AfterClass
+ public static void after() throws InterruptedException {
+ if (channel != null) {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void liveConflictReportsConflictReason() {
+ // txn1 creates a node with a name.
+ Transaction txn1 = client.newTransaction();
+ Mutation mu1 =
+ Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8("{\"name\": \"Manish\"}")).build();
+ Response assigned = txn1.mutate(mu1);
+ assertEquals(assigned.getUidsMap().size(), 1, "expected exactly one assigned uid");
+ String uid = assigned.getUidsMap().values().iterator().next();
+
+ // txn2 writes the same predicate on the same uid -> conflicts.
+ Transaction txn2 = client.newTransaction();
+ Mutation mu2 =
+ Mutation.newBuilder()
+ .setSetJson(ByteString.copyFromUtf8("{\"uid\": \"" + uid + "\", \"name\": \"Manish\"}"))
+ .build();
+ txn2.mutate(mu2);
+
+ // First commit wins; its commitTs is now greater than txn2's startTs.
+ txn1.commit();
+
+ // Second commit must abort with the CONFLICT category, still retryable, with the
+ // full "conflict: ..." server message preserved.
+ try {
+ txn2.commit();
+ fail("expected the conflicting second commit to throw TxnConflictException");
+ } catch (TxnConflictException e) {
+ assertEquals(
+ e.getReason(),
+ AbortReason.CONFLICT,
+ "server-reported reason should parse to CONFLICT; full message: " + e.getMessage());
+ assertTrue(e.isRetryable(), "conflict aborts are retryable");
+ assertTrue(
+ e.getMessage().contains("conflict:"),
+ "full categorized server message should be preserved; got: " + e.getMessage());
+ }
+ }
+
+ /**
+ * A transaction's start ts becomes "stale" when it predates the current Zero leader's lease — i.e.
+ * after a leader change. We force that by opening a transaction and then restarting Zero (via the
+ * configured command): on restart Zero renews its lease and advances startTxnTs past every
+ * previously-leased start ts, so committing the now-old txn aborts with STALE_STARTTS.
+ */
+ @Test
+ public void liveStaleStartTsReportsStaleReason() throws Exception {
+ if (ZERO_RESTART_CMD == null) {
+ throw new SkipException(
+ "set dgraph.test.zeroRestartCmd / TEST_ZERO_RESTART_CMD to restart Zero");
+ }
+
+ // Open a transaction so it gets a start ts that the restart will invalidate.
+ Transaction txn = client.newTransaction();
+ txn.mutate(
+ Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8("{\"name\": \"Manish\"}")).build());
+
+ // Restart Zero; sleeps give the leader time to re-establish (lease renewal, hence the
+ // startTxnTs bump, runs on becoming leader).
+ runShell(ZERO_RESTART_CMD);
+ Thread.sleep(8000);
+
+ try {
+ txn.commit();
+ fail("expected the stale commit to throw TxnConflictException");
+ } catch (TxnConflictException e) {
+ assertEquals(
+ e.getReason(),
+ AbortReason.STALE_STARTTS,
+ "server-reported reason should parse to STALE_STARTTS; full message: " + e.getMessage());
+ assertTrue(
+ e.getMessage().contains("stale-startts:"),
+ "full categorized server message should be preserved; got: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Moving a predicate's tablet to another group rejects commits that mutated it on the old group.
+ * We mutate "name" while its tablet is on the source group, move the tablet, then commit: the
+ * commit's predicate keys reference the old group, so Zero's checkPreds rejects it with the
+ * PREDICATE_MOVE category.
+ */
+ @Test
+ public void livePredicateMoveReportsPredicateMoveReason() throws Exception {
+ if (ZERO_HTTP == null) {
+ throw new SkipException(
+ "set dgraph.test.zeroHttp / TEST_ZERO_HTTP and run a multi-group cluster");
+ }
+
+ client.alter(Operation.newBuilder().setSchema("name: string @index(exact) .").build());
+
+ // Seed so the "name" tablet exists and settles on some group.
+ Transaction seed = client.newTransaction();
+ seed.mutate(
+ Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8("{\"name\": \"seed\"}")).build());
+ seed.commit();
+ Thread.sleep(1000);
+
+ String src = groupOf("name");
+ if (src == null || groupCount() < 2) {
+ throw new SkipException("need a multi-group cluster serving predicate 'name'");
+ }
+ String dst = src.equals("1") ? "2" : "1";
+
+ // Mutate "name" while it is on `src` (the txn's predicate keys reference `src`), don't commit.
+ Transaction txn = client.newTransaction();
+ txn.mutate(
+ Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8("{\"name\": \"Manish\"}")).build());
+
+ // Move the tablet and wait for the move to complete.
+ httpGet("http://" + ZERO_HTTP + "/moveTablet?tablet=name&group=" + dst);
+ long deadline = System.currentTimeMillis() + 60_000;
+ while (System.currentTimeMillis() < deadline && !dst.equals(groupOf("name"))) {
+ Thread.sleep(1000);
+ }
+ assertEquals(groupOf("name"), dst, "tablet move did not complete");
+
+ try {
+ txn.commit();
+ fail("expected the post-move commit to throw TxnConflictException");
+ } catch (TxnConflictException e) {
+ assertEquals(
+ e.getReason(),
+ AbortReason.PREDICATE_MOVE,
+ "server-reported reason should parse to PREDICATE_MOVE; full message: " + e.getMessage());
+ assertTrue(
+ e.getMessage().contains("predicate-move:"),
+ "full categorized server message should be preserved; got: " + e.getMessage());
+ }
+ }
+
+ // --- helpers ---
+
+ @SuppressWarnings("unchecked")
+ private static Map zeroState() throws Exception {
+ String body = httpGet("http://" + ZERO_HTTP + "/state");
+ return new Gson().fromJson(body, Map.class);
+ }
+
+ /** Returns the group id serving the given predicate (matching the namespaced tablet key). */
+ @SuppressWarnings("unchecked")
+ private static String groupOf(String pred) throws Exception {
+ Map groups = (Map) zeroState().get("groups");
+ if (groups == null) {
+ return null;
+ }
+ for (Map.Entry e : groups.entrySet()) {
+ Object tabletsObj = ((Map) e.getValue()).get("tablets");
+ if (tabletsObj instanceof Map) {
+ for (String tablet : ((Map) tabletsObj).keySet()) {
+ if (tablet.equals(pred) || tablet.endsWith("-" + pred)) {
+ return e.getKey();
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static int groupCount() throws Exception {
+ Map groups = (Map) zeroState().get("groups");
+ return groups == null ? 0 : groups.size();
+ }
+
+ private static String httpGet(String urlStr) throws Exception {
+ HttpURLConnection conn = (HttpURLConnection) new URL(urlStr).openConnection();
+ conn.setRequestMethod("GET");
+ try (BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = rd.readLine()) != null) {
+ sb.append(line);
+ }
+ return sb.toString();
+ } finally {
+ conn.disconnect();
+ }
+ }
+
+ private static void runShell(String cmd) throws Exception {
+ Process p = new ProcessBuilder("bash", "-c", cmd).inheritIO().start();
+ if (!p.waitFor(60, TimeUnit.SECONDS)) {
+ p.destroyForcibly();
+ throw new RuntimeException("zero restart command timed out: " + cmd);
+ }
+ if (p.exitValue() != 0) {
+ throw new RuntimeException("zero restart command failed (" + p.exitValue() + "): " + cmd);
+ }
+ }
+}
diff --git a/src/test/java/io/dgraph/AbortReasonTest.java b/src/test/java/io/dgraph/AbortReasonTest.java
new file mode 100644
index 0000000..8159d8d
--- /dev/null
+++ b/src/test/java/io/dgraph/AbortReasonTest.java
@@ -0,0 +1,133 @@
+/*
+ * SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.dgraph;
+
+import static org.testng.Assert.*;
+
+import io.dgraph.TxnConflictException.AbortReason;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for surfacing the transaction-abort reason to the client. The Dgraph server encodes the
+ * abort category as a {@code ": "} prefix on the gRPC ABORTED status; these tests
+ * verify that {@link TxnConflictException#getReason()} parses it, that the mapping from a raw gRPC
+ * error goes through {@link Exceptions#translate}, and that behavior degrades gracefully against a
+ * server that reports no reason.
+ */
+public class AbortReasonTest {
+
+ private StatusRuntimeException aborted(String description) {
+ return Status.Code.ABORTED.toStatus().withDescription(description).asRuntimeException();
+ }
+
+ private TxnConflictException conflictExceptionFor(String description) {
+ DgraphException ex = Exceptions.translate(aborted(description));
+ assertTrue(
+ ex instanceof TxnConflictException,
+ "ABORTED status should map to TxnConflictException, got " + ex.getClass());
+ return (TxnConflictException) ex;
+ }
+
+ // --- Reason categorization (the three server-reported categories) ---
+
+ @Test
+ public void testConflictReason() {
+ TxnConflictException ex =
+ conflictExceptionFor("conflict: Transaction has been aborted. Please retry");
+ assertEquals(ex.getReason(), AbortReason.CONFLICT);
+ assertTrue(ex.isRetryable());
+ }
+
+ @Test
+ public void testPredicateMoveReason() {
+ TxnConflictException ex =
+ conflictExceptionFor(
+ "predicate-move: Commits on predicate name are blocked due to predicate move");
+ assertEquals(ex.getReason(), AbortReason.PREDICATE_MOVE);
+ assertTrue(ex.isRetryable());
+ }
+
+ @Test
+ public void testStaleStartTsReason() {
+ TxnConflictException ex =
+ conflictExceptionFor(
+ "stale-startts: Transaction has been aborted due to a leader change. Please retry");
+ assertEquals(ex.getReason(), AbortReason.STALE_STARTTS);
+ assertTrue(ex.isRetryable());
+ }
+
+ // --- Full message preserved alongside the parsed reason (backward compatibility) ---
+
+ @Test
+ public void testFullMessageIsPreserved() {
+ String desc = "conflict: Transaction has been aborted. Please retry";
+ TxnConflictException ex = conflictExceptionFor(desc);
+ // getMessage() still exposes the complete human-readable description.
+ assertTrue(ex.getMessage().contains(desc));
+ assertEquals(ex.getStatus().getDescription(), desc);
+ }
+
+ // --- Graceful degradation against an older server (no reason prefix) ---
+
+ @Test
+ public void testLegacyMessageDegradesToUnknown() {
+ // Pre-feature servers emit the bare static string with no category prefix.
+ TxnConflictException ex = conflictExceptionFor("Transaction has been aborted. Please retry");
+ assertEquals(ex.getReason(), AbortReason.UNKNOWN);
+ assertTrue(ex.isRetryable());
+ }
+
+ @Test
+ public void testUnrecognizedPrefixDegradesToUnknown() {
+ TxnConflictException ex = conflictExceptionFor("something-else: not a known category");
+ assertEquals(ex.getReason(), AbortReason.UNKNOWN);
+ }
+
+ @Test
+ public void testNullDescriptionIsUnknown() {
+ DgraphException ex = Exceptions.translate(Status.ABORTED.asRuntimeException());
+ assertTrue(ex instanceof TxnConflictException);
+ assertEquals(((TxnConflictException) ex).getReason(), AbortReason.UNKNOWN);
+ }
+
+ // --- Parsing robustness ---
+
+ @Test
+ public void testReasonIsCaseInsensitiveAndTrimmed() {
+ assertEquals(conflictExceptionFor("CONFLICT: x").getReason(), AbortReason.CONFLICT);
+ assertEquals(conflictExceptionFor(" predicate-move : y").getReason(), AbortReason.PREDICATE_MOVE);
+ }
+
+ @Test
+ public void testReasonWithoutDetailStillParses() {
+ // A bare code with no ": detail" suffix should still categorize.
+ assertEquals(conflictExceptionFor("conflict").getReason(), AbortReason.CONFLICT);
+ }
+
+ // --- The constructor used elsewhere in the client still works ---
+
+ @Test
+ public void testStringConstructorReason() {
+ TxnConflictException ex = new TxnConflictException("conflict: manual");
+ assertEquals(ex.getReason(), AbortReason.CONFLICT);
+ assertTrue(ex.isRetryable());
+ }
+
+ @Test
+ public void testFailedPreconditionAlsoCarriesReason() {
+ // FAILED_PRECONDITION also maps to TxnConflictException; reason parsing applies there too.
+ DgraphException ex =
+ Exceptions.translate(
+ Status.Code.FAILED_PRECONDITION
+ .toStatus()
+ .withDescription("conflict: Transaction conflict")
+ .asRuntimeException());
+ assertTrue(ex instanceof TxnConflictException);
+ assertEquals(((TxnConflictException) ex).getReason(), AbortReason.CONFLICT);
+ }
+}