diff --git a/docker-compose.abort-reason.yml b/docker-compose.abort-reason.yml new file mode 100644 index 0000000..010437a --- /dev/null +++ b/docker-compose.abort-reason.yml @@ -0,0 +1,41 @@ +# Multi-group, no-ACL cluster for the live transaction-abort-reason tests +# (io.dgraph.AbortReasonLiveTest). Two alpha groups (replicas=1) enable the +# predicate-move case; the single zero is restartable for the stale-startts case. +# +# Usage: +# make local-image # build dgraph/dgraph:local from this branch +# docker compose -f docker-compose.abort-reason.yml up -d +# TEST_GRPC_PORT=9180 \ +# TEST_ZERO_HTTP=localhost:6180 \ +# TEST_ZERO_RESTART_CMD="docker compose -f docker-compose.abort-reason.yml restart zero1" \ +# ./gradlew test --tests io.dgraph.AbortReasonLiveTest +# docker compose -f docker-compose.abort-reason.yml down +version: "3.5" +services: + zero1: + image: dgraph/dgraph:local + container_name: ar_zero1 + ports: + - 5180:5180 + - 6180:6180 + command: dgraph zero -o 100 --my=zero1:5180 --replicas=1 --logtostderr -v=2 --bindall + + alpha1: + image: dgraph/dgraph:local + container_name: ar_alpha1 + ports: + - 8180:8180 + - 9180:9180 + command: + dgraph alpha -o 100 --my=alpha1:7180 --zero=zero1:5180 --logtostderr -v=2 --raft "idx=1; + group=1" --security "whitelist=0.0.0.0/0;" + + alpha2: + image: dgraph/dgraph:local + container_name: ar_alpha2 + ports: + - 8182:8182 + - 9182:9182 + command: + dgraph alpha -o 102 --my=alpha2:7182 --zero=zero1:5180 --logtostderr -v=2 --raft "idx=2; + group=2" --security "whitelist=0.0.0.0/0;" diff --git a/src/main/java/io/dgraph/TxnConflictException.java b/src/main/java/io/dgraph/TxnConflictException.java index f6a4e66..b963ba6 100644 --- a/src/main/java/io/dgraph/TxnConflictException.java +++ b/src/main/java/io/dgraph/TxnConflictException.java @@ -15,6 +15,27 @@ public class TxnConflictException extends TxnException { private static final long serialVersionUID = 1L; + /** + * The category of a transaction abort, as reported by the Dgraph server. + * + * + */ + public enum AbortReason { + CONFLICT, + PREDICATE_MOVE, + STALE_STARTTS, + UNKNOWN + } + public TxnConflictException(String msg) { super(Status.ABORTED.withDescription(msg), null); } @@ -23,6 +44,31 @@ public TxnConflictException(String msg) { super(status, trailers); } + /** + * Returns the category of this abort. The server encodes the reason as a {@code ": "} + * prefix on the gRPC status description; this method parses that prefix. Against a server that does + * not report a reason (older versions), this returns {@link AbortReason#UNKNOWN}. The full + * human-readable description remains available via {@link #getMessage()}. + */ + public AbortReason getReason() { + String desc = getStatus().getDescription(); + if (desc == null) { + return AbortReason.UNKNOWN; + } + int colon = desc.indexOf(':'); + String code = (colon >= 0 ? desc.substring(0, colon) : desc).trim().toLowerCase(); + switch (code) { + case "conflict": + return AbortReason.CONFLICT; + case "predicate-move": + return AbortReason.PREDICATE_MOVE; + case "stale-startts": + return AbortReason.STALE_STARTTS; + default: + return AbortReason.UNKNOWN; + } + } + @Override public boolean isRetryable() { return true; diff --git a/src/test/java/io/dgraph/AbortReasonLiveTest.java b/src/test/java/io/dgraph/AbortReasonLiveTest.java new file mode 100644 index 0000000..2a85334 --- /dev/null +++ b/src/test/java/io/dgraph/AbortReasonLiveTest.java @@ -0,0 +1,272 @@ +/* + * SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.dgraph; + +import static org.testng.Assert.*; + +import com.google.gson.Gson; +import com.google.protobuf.ByteString; +import io.dgraph.DgraphProto.Mutation; +import io.dgraph.DgraphProto.Operation; +import io.dgraph.DgraphProto.Response; +import io.dgraph.TxnConflictException.AbortReason; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Live cross-language end-to-end proof for the transaction-abort reason. Unlike {@link + * AbortReasonTest}, which feeds synthetic gRPC statuses into the parser, this test drives a real + * (locally patched) Dgraph server and asserts that each abort category propagates all the way to + * {@link TxnConflictException#getReason()}. This closes the loop the unit tests cannot — proving the + * server actually emits the categorized reason on the wire and the client parses it. + * + *

Configuration (system properties or environment variables): + * + *

+ * + * Tests whose infrastructure is not configured are skipped (not failed), so the file is safe in the + * default run. Run explicitly with {@code --tests io.dgraph.AbortReasonLiveTest}. + */ +public class AbortReasonLiveTest { + private static final String HOST = conf("dgraph.test.host", "TEST_HOSTNAME", "localhost"); + private static final int PORT = + Integer.parseInt(conf("dgraph.test.port", "TEST_GRPC_PORT", "9180")); + private static final String ZERO_HTTP = conf("dgraph.test.zeroHttp", "TEST_ZERO_HTTP", null); + private static final String ZERO_RESTART_CMD = + conf("dgraph.test.zeroRestartCmd", "TEST_ZERO_RESTART_CMD", null); + + private static ManagedChannel channel; + private static DgraphClient client; + + private static String conf(String prop, String env, String dflt) { + String v = System.getProperty(prop); + if (v == null || v.isEmpty()) { + v = System.getenv(env); + } + return (v == null || v.isEmpty()) ? dflt : v; + } + + @BeforeClass + public static void before() { + channel = ManagedChannelBuilder.forAddress(HOST, PORT).usePlaintext().build(); + client = new DgraphClient(DgraphGrpc.newStub(channel)); + client.alter(Operation.newBuilder().setDropAll(true).build()); + } + + @AfterClass + public static void after() throws InterruptedException { + if (channel != null) { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + } + + @Test + public void liveConflictReportsConflictReason() { + // txn1 creates a node with a name. + Transaction txn1 = client.newTransaction(); + Mutation mu1 = + Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8("{\"name\": \"Manish\"}")).build(); + Response assigned = txn1.mutate(mu1); + assertEquals(assigned.getUidsMap().size(), 1, "expected exactly one assigned uid"); + String uid = assigned.getUidsMap().values().iterator().next(); + + // txn2 writes the same predicate on the same uid -> conflicts. + Transaction txn2 = client.newTransaction(); + Mutation mu2 = + Mutation.newBuilder() + .setSetJson(ByteString.copyFromUtf8("{\"uid\": \"" + uid + "\", \"name\": \"Manish\"}")) + .build(); + txn2.mutate(mu2); + + // First commit wins; its commitTs is now greater than txn2's startTs. + txn1.commit(); + + // Second commit must abort with the CONFLICT category, still retryable, with the + // full "conflict: ..." server message preserved. + try { + txn2.commit(); + fail("expected the conflicting second commit to throw TxnConflictException"); + } catch (TxnConflictException e) { + assertEquals( + e.getReason(), + AbortReason.CONFLICT, + "server-reported reason should parse to CONFLICT; full message: " + e.getMessage()); + assertTrue(e.isRetryable(), "conflict aborts are retryable"); + assertTrue( + e.getMessage().contains("conflict:"), + "full categorized server message should be preserved; got: " + e.getMessage()); + } + } + + /** + * A transaction's start ts becomes "stale" when it predates the current Zero leader's lease — i.e. + * after a leader change. We force that by opening a transaction and then restarting Zero (via the + * configured command): on restart Zero renews its lease and advances startTxnTs past every + * previously-leased start ts, so committing the now-old txn aborts with STALE_STARTTS. + */ + @Test + public void liveStaleStartTsReportsStaleReason() throws Exception { + if (ZERO_RESTART_CMD == null) { + throw new SkipException( + "set dgraph.test.zeroRestartCmd / TEST_ZERO_RESTART_CMD to restart Zero"); + } + + // Open a transaction so it gets a start ts that the restart will invalidate. + Transaction txn = client.newTransaction(); + txn.mutate( + Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8("{\"name\": \"Manish\"}")).build()); + + // Restart Zero; sleeps give the leader time to re-establish (lease renewal, hence the + // startTxnTs bump, runs on becoming leader). + runShell(ZERO_RESTART_CMD); + Thread.sleep(8000); + + try { + txn.commit(); + fail("expected the stale commit to throw TxnConflictException"); + } catch (TxnConflictException e) { + assertEquals( + e.getReason(), + AbortReason.STALE_STARTTS, + "server-reported reason should parse to STALE_STARTTS; full message: " + e.getMessage()); + assertTrue( + e.getMessage().contains("stale-startts:"), + "full categorized server message should be preserved; got: " + e.getMessage()); + } + } + + /** + * Moving a predicate's tablet to another group rejects commits that mutated it on the old group. + * We mutate "name" while its tablet is on the source group, move the tablet, then commit: the + * commit's predicate keys reference the old group, so Zero's checkPreds rejects it with the + * PREDICATE_MOVE category. + */ + @Test + public void livePredicateMoveReportsPredicateMoveReason() throws Exception { + if (ZERO_HTTP == null) { + throw new SkipException( + "set dgraph.test.zeroHttp / TEST_ZERO_HTTP and run a multi-group cluster"); + } + + client.alter(Operation.newBuilder().setSchema("name: string @index(exact) .").build()); + + // Seed so the "name" tablet exists and settles on some group. + Transaction seed = client.newTransaction(); + seed.mutate( + Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8("{\"name\": \"seed\"}")).build()); + seed.commit(); + Thread.sleep(1000); + + String src = groupOf("name"); + if (src == null || groupCount() < 2) { + throw new SkipException("need a multi-group cluster serving predicate 'name'"); + } + String dst = src.equals("1") ? "2" : "1"; + + // Mutate "name" while it is on `src` (the txn's predicate keys reference `src`), don't commit. + Transaction txn = client.newTransaction(); + txn.mutate( + Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8("{\"name\": \"Manish\"}")).build()); + + // Move the tablet and wait for the move to complete. + httpGet("http://" + ZERO_HTTP + "/moveTablet?tablet=name&group=" + dst); + long deadline = System.currentTimeMillis() + 60_000; + while (System.currentTimeMillis() < deadline && !dst.equals(groupOf("name"))) { + Thread.sleep(1000); + } + assertEquals(groupOf("name"), dst, "tablet move did not complete"); + + try { + txn.commit(); + fail("expected the post-move commit to throw TxnConflictException"); + } catch (TxnConflictException e) { + assertEquals( + e.getReason(), + AbortReason.PREDICATE_MOVE, + "server-reported reason should parse to PREDICATE_MOVE; full message: " + e.getMessage()); + assertTrue( + e.getMessage().contains("predicate-move:"), + "full categorized server message should be preserved; got: " + e.getMessage()); + } + } + + // --- helpers --- + + @SuppressWarnings("unchecked") + private static Map zeroState() throws Exception { + String body = httpGet("http://" + ZERO_HTTP + "/state"); + return new Gson().fromJson(body, Map.class); + } + + /** Returns the group id serving the given predicate (matching the namespaced tablet key). */ + @SuppressWarnings("unchecked") + private static String groupOf(String pred) throws Exception { + Map groups = (Map) zeroState().get("groups"); + if (groups == null) { + return null; + } + for (Map.Entry e : groups.entrySet()) { + Object tabletsObj = ((Map) e.getValue()).get("tablets"); + if (tabletsObj instanceof Map) { + for (String tablet : ((Map) tabletsObj).keySet()) { + if (tablet.equals(pred) || tablet.endsWith("-" + pred)) { + return e.getKey(); + } + } + } + } + return null; + } + + @SuppressWarnings("unchecked") + private static int groupCount() throws Exception { + Map groups = (Map) zeroState().get("groups"); + return groups == null ? 0 : groups.size(); + } + + private static String httpGet(String urlStr) throws Exception { + HttpURLConnection conn = (HttpURLConnection) new URL(urlStr).openConnection(); + conn.setRequestMethod("GET"); + try (BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + StringBuilder sb = new StringBuilder(); + String line; + while ((line = rd.readLine()) != null) { + sb.append(line); + } + return sb.toString(); + } finally { + conn.disconnect(); + } + } + + private static void runShell(String cmd) throws Exception { + Process p = new ProcessBuilder("bash", "-c", cmd).inheritIO().start(); + if (!p.waitFor(60, TimeUnit.SECONDS)) { + p.destroyForcibly(); + throw new RuntimeException("zero restart command timed out: " + cmd); + } + if (p.exitValue() != 0) { + throw new RuntimeException("zero restart command failed (" + p.exitValue() + "): " + cmd); + } + } +} diff --git a/src/test/java/io/dgraph/AbortReasonTest.java b/src/test/java/io/dgraph/AbortReasonTest.java new file mode 100644 index 0000000..8159d8d --- /dev/null +++ b/src/test/java/io/dgraph/AbortReasonTest.java @@ -0,0 +1,133 @@ +/* + * SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.dgraph; + +import static org.testng.Assert.*; + +import io.dgraph.TxnConflictException.AbortReason; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.testng.annotations.Test; + +/** + * Unit tests for surfacing the transaction-abort reason to the client. The Dgraph server encodes the + * abort category as a {@code ": "} prefix on the gRPC ABORTED status; these tests + * verify that {@link TxnConflictException#getReason()} parses it, that the mapping from a raw gRPC + * error goes through {@link Exceptions#translate}, and that behavior degrades gracefully against a + * server that reports no reason. + */ +public class AbortReasonTest { + + private StatusRuntimeException aborted(String description) { + return Status.Code.ABORTED.toStatus().withDescription(description).asRuntimeException(); + } + + private TxnConflictException conflictExceptionFor(String description) { + DgraphException ex = Exceptions.translate(aborted(description)); + assertTrue( + ex instanceof TxnConflictException, + "ABORTED status should map to TxnConflictException, got " + ex.getClass()); + return (TxnConflictException) ex; + } + + // --- Reason categorization (the three server-reported categories) --- + + @Test + public void testConflictReason() { + TxnConflictException ex = + conflictExceptionFor("conflict: Transaction has been aborted. Please retry"); + assertEquals(ex.getReason(), AbortReason.CONFLICT); + assertTrue(ex.isRetryable()); + } + + @Test + public void testPredicateMoveReason() { + TxnConflictException ex = + conflictExceptionFor( + "predicate-move: Commits on predicate name are blocked due to predicate move"); + assertEquals(ex.getReason(), AbortReason.PREDICATE_MOVE); + assertTrue(ex.isRetryable()); + } + + @Test + public void testStaleStartTsReason() { + TxnConflictException ex = + conflictExceptionFor( + "stale-startts: Transaction has been aborted due to a leader change. Please retry"); + assertEquals(ex.getReason(), AbortReason.STALE_STARTTS); + assertTrue(ex.isRetryable()); + } + + // --- Full message preserved alongside the parsed reason (backward compatibility) --- + + @Test + public void testFullMessageIsPreserved() { + String desc = "conflict: Transaction has been aborted. Please retry"; + TxnConflictException ex = conflictExceptionFor(desc); + // getMessage() still exposes the complete human-readable description. + assertTrue(ex.getMessage().contains(desc)); + assertEquals(ex.getStatus().getDescription(), desc); + } + + // --- Graceful degradation against an older server (no reason prefix) --- + + @Test + public void testLegacyMessageDegradesToUnknown() { + // Pre-feature servers emit the bare static string with no category prefix. + TxnConflictException ex = conflictExceptionFor("Transaction has been aborted. Please retry"); + assertEquals(ex.getReason(), AbortReason.UNKNOWN); + assertTrue(ex.isRetryable()); + } + + @Test + public void testUnrecognizedPrefixDegradesToUnknown() { + TxnConflictException ex = conflictExceptionFor("something-else: not a known category"); + assertEquals(ex.getReason(), AbortReason.UNKNOWN); + } + + @Test + public void testNullDescriptionIsUnknown() { + DgraphException ex = Exceptions.translate(Status.ABORTED.asRuntimeException()); + assertTrue(ex instanceof TxnConflictException); + assertEquals(((TxnConflictException) ex).getReason(), AbortReason.UNKNOWN); + } + + // --- Parsing robustness --- + + @Test + public void testReasonIsCaseInsensitiveAndTrimmed() { + assertEquals(conflictExceptionFor("CONFLICT: x").getReason(), AbortReason.CONFLICT); + assertEquals(conflictExceptionFor(" predicate-move : y").getReason(), AbortReason.PREDICATE_MOVE); + } + + @Test + public void testReasonWithoutDetailStillParses() { + // A bare code with no ": detail" suffix should still categorize. + assertEquals(conflictExceptionFor("conflict").getReason(), AbortReason.CONFLICT); + } + + // --- The constructor used elsewhere in the client still works --- + + @Test + public void testStringConstructorReason() { + TxnConflictException ex = new TxnConflictException("conflict: manual"); + assertEquals(ex.getReason(), AbortReason.CONFLICT); + assertTrue(ex.isRetryable()); + } + + @Test + public void testFailedPreconditionAlsoCarriesReason() { + // FAILED_PRECONDITION also maps to TxnConflictException; reason parsing applies there too. + DgraphException ex = + Exceptions.translate( + Status.Code.FAILED_PRECONDITION + .toStatus() + .withDescription("conflict: Transaction conflict") + .asRuntimeException()); + assertTrue(ex instanceof TxnConflictException); + assertEquals(((TxnConflictException) ex).getReason(), AbortReason.CONFLICT); + } +}