diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java index c7c5a610e34..51373d77410 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java @@ -23,7 +23,6 @@ import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptChecker; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.AbstractTraverser; -import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory; @@ -36,7 +35,6 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; /** * The context of Gremlin Server within which a particular request is made. @@ -54,8 +52,6 @@ public class Context { private final String materializeProperties; private final Object gremlinArgument; private final RequestType requestType; - private HttpGremlinEndpointHandler.RequestState requestState; - private final AtomicBoolean startedResponse = new AtomicBoolean(false); private ScheduledFuture timeoutExecutor = null; private boolean timeoutExecutorGrabbed = false; private final Object timeoutExecutorLock = new Object(); @@ -65,14 +61,6 @@ public class Context { public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx, final Settings settings, final GraphManager graphManager, final GremlinExecutor gremlinExecutor, final ScheduledExecutorService scheduledExecutorService) { - this(requestMessage, ctx, settings, graphManager, gremlinExecutor, scheduledExecutorService, - HttpGremlinEndpointHandler.RequestState.NOT_STARTED); - } - - public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx, - final Settings settings, final GraphManager graphManager, - final GremlinExecutor gremlinExecutor, final ScheduledExecutorService scheduledExecutorService, - final HttpGremlinEndpointHandler.RequestState requestState) { this.requestMessage = requestMessage; this.channelHandlerContext = ctx; this.settings = settings; @@ -84,7 +72,6 @@ public Context(final RequestMessage requestMessage, final ChannelHandlerContext final String gremlin = requestMessage.getGremlin(); this.gremlinArgument = gremlin; this.requestType = RequestType.fromGremlin(gremlin); - this.requestState = requestState; this.requestTimeout = determineTimeout(); this.materializeProperties = determineMaterializeProperties(); this.transactionId = requestMessage.getField(Tokens.ARGS_TRANSACTION_ID); @@ -200,16 +187,6 @@ public GremlinExecutor getGremlinExecutor() { return gremlinExecutor; } - /** - * Gets whether the server has started processing the response for this request. - */ - public boolean getStartedResponse() { return startedResponse.get(); } - - /** - * Signal that the server has started processing the response. - */ - public void setStartedResponse() { startedResponse.set(true); } - private long determineTimeout() { // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent // both configurations from being submitted at the same time @@ -250,14 +227,6 @@ public void handleDetachment(final List aggregate) { } } - public HttpGremlinEndpointHandler.RequestState getRequestState() { - return requestState; - } - - public void setRequestState(HttpGremlinEndpointHandler.RequestState requestState) { - this.requestState = requestState; - } - /** * Classifies an HTTP request by the kind of work it performs. Transaction control requests reuse the canonical * Gremlin idioms ({@code g.tx().begin()} etc.) as protocol signals rather than evaluating them, because the diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index 40f0d2b79fd..2248044a28b 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -20,13 +20,11 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; -import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -66,7 +64,6 @@ import org.apache.tinkerpop.gremlin.util.Tokens; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; -import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; import org.codehaus.groovy.control.MultipleCompilationErrorsException; import org.javatuples.Pair; @@ -103,14 +100,6 @@ import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING; import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE; import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; -import static io.netty.handler.codec.http.HttpResponseStatus.OK; -import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHED; -import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHING; -import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.NOT_STARTED; -import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.STREAMING; -import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendHttpResponse; -import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendLastHttpContent; -import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.writeError; /** * Handler that processes RequestMessage. This handler will attempt to execute the query and stream the results back @@ -184,11 +173,11 @@ public HttpGremlinEndpointHandler(final GremlinExecutor gremlinExecutor, @Override public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage requestMessage) { - ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(false); final Pair> serializer = ctx.channel().attr(StateKey.SERIALIZER).get(); final Context requestCtx = new Context(requestMessage, ctx, settings, graphManager, gremlinExecutor, - gremlinExecutor.getScheduledExecutorService(), NOT_STARTED); + gremlinExecutor.getScheduledExecutorService()); + final HttpResponseCoordinator coordinator = new HttpResponseCoordinator(requestCtx, serializer.getValue0(), serializer.getValue1()); final Timer.Context timerContext = evalOpTimer.time(); // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent @@ -197,8 +186,6 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r final long seto = (null != timeoutMs) ? timeoutMs : requestCtx.getSettings().getEvaluationTimeout(); final FutureTask evalFuture = new FutureTask<>(() -> { - requestCtx.setStartedResponse(); - try { logger.debug("Processing request containing script [{}] and bindings of [{}] on {}", requestMessage.getFieldOrDefault(Tokens.ARGS_GREMLIN, ""), @@ -283,15 +270,13 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r // Send back the 200 OK response header here since the response is always chunk transfer encoded. Any // failures that follow this will show up in the response body instead. - sendHttpResponse(ctx, OK, createResponseHeaders(ctx, serializer, requestCtx).toArray(CharSequence[]::new)); - sendHttpContents(ctx, requestCtx); - // Skip if writeError() already terminated the response (e.g., serialization error in makeChunk). - // Sending a second LastHttpContent would corrupt the HTTP framing on keep-alive connections. - if (requestCtx.getRequestState() != RequestState.ERROR) { - sendLastHttpContent(ctx, HttpResponseStatus.OK, ""); - } + coordinator.writeHeader(createResponseHeaders(ctx, serializer, requestCtx).toArray(CharSequence[]::new)); + sendHttpContents(ctx, requestCtx, coordinator); + // Idempotent terminal call: if the data path already terminated the response, complete() is a no-op + // via its COMPLETED short-circuit. Otherwise it writes the terminal LastHttpContent. + coordinator.complete(HttpResponseStatus.OK, ""); } catch (Throwable t) { - writeError(requestCtx, formErrorResponseMessage(t, requestMessage), serializer.getValue1()); + coordinator.writeError(formErrorResponseMessage(t, requestMessage)); } finally { timerContext.stop(); @@ -313,18 +298,18 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r transactionManager.get(requestCtx.getTransactionId()).get().submit(evalFuture) : requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture); if (seto > 0) { - // Schedule a timeout in the thread pool for future execution + // Schedule a timeout in the thread pool for future execution. The coordinator's monitor guarantees + // exactly one response: whichever of this timeout task or the eval task terminates the response + // first wins, and the other's write becomes a no-op. requestCtx.setTimeoutExecutor(requestCtx.getScheduledExecutorService().schedule(() -> { executionFuture.cancel(true); - if (!requestCtx.getStartedResponse()) { - writeError(requestCtx, GremlinError.timeout(requestMessage), serializer.getValue1()); - } + coordinator.writeError(GremlinError.timeout(requestMessage)); }, seto, TimeUnit.MILLISECONDS)); } } catch (RejectedExecutionException ree) { - writeError(requestCtx, GremlinError.rateLimiting(), serializer.getValue1()); + coordinator.writeError(GremlinError.rateLimiting()); } catch (NoSuchElementException | IllegalStateException nsee) { - writeError(requestCtx, GremlinError.transactionNotFound(requestCtx.getTransactionId()), serializer.getValue1()); + coordinator.writeError(GremlinError.transactionNotFound(requestCtx.getTransactionId())); } } @@ -345,7 +330,8 @@ private List createResponseHeaders(final ChannelHandlerContext ctx return headers; } - private void sendHttpContents(final ChannelHandlerContext ctx, final Context requestContext) throws Exception { + private void sendHttpContents(final ChannelHandlerContext ctx, final Context requestContext, + final HttpResponseCoordinator coordinator) throws Exception { final Pair> serializer = ctx.channel().attr(StateKey.SERIALIZER).get(); final RequestMessage request = requestContext.getRequestMessage(); final String txId = requestContext.getTransactionId(); @@ -355,14 +341,14 @@ private void sendHttpContents(final ChannelHandlerContext ctx, final Context req if ((txId != null) && transaction.isEmpty()) throw new ProcessingException(GremlinError.transactionNotFound(txId)); if (requestContext.isTransactionBegin()) { - runBegin(requestContext, transaction.get(), serializer); + runBegin(transaction.get(), coordinator); } else if (requestContext.isTransactionCommit()) { - handleGraphOp(requestContext, txId, Transaction::commit, serializer); + handleGraphOp(requestContext, txId, Transaction::commit, coordinator); } else if (requestContext.isTransactionRollback()) { - handleGraphOp(requestContext, txId, Transaction::rollback, serializer); + handleGraphOp(requestContext, txId, Transaction::rollback, coordinator); } else { // Both transactional and non-transactional traversals follow this path for response chunking. - iterateScriptEvalResult(requestContext, serializer.getValue1(), request); + iterateScriptEvalResult(requestContext, serializer.getValue1(), request, coordinator); } } @@ -416,7 +402,8 @@ private GremlinError formErrorResponseMessage(Throwable t, RequestMessage reques return GremlinError.general(t); } - private void iterateScriptEvalResult(final Context context, MessageSerializer serializer, final RequestMessage message) + private void iterateScriptEvalResult(final Context context, MessageSerializer serializer, final RequestMessage message, + final HttpResponseCoordinator coordinator) throws ProcessingException, InterruptedException, ScriptException { final Map args = message.getFields(); final String language = args.containsKey(Tokens.ARGS_LANGUAGE) ? (String) args.get(Tokens.ARGS_LANGUAGE) : "gremlin-lang"; @@ -460,10 +447,10 @@ private void iterateScriptEvalResult(final Context context, MessageSerializer // optimization for driver requests ((Traversal.Admin) result).applyStrategies(); itty = new TraverserIterator((Traversal.Admin) result); - handleIterator(context, itty, serializer, true); + handleIterator(context, itty, coordinator, true); } else { itty = IteratorUtils.asIterator(result); - handleIterator(context, itty, serializer, false); + handleIterator(context, itty, coordinator, false); } if (autoCommit && graph.tx().isOpen()) graph.tx().commit(); @@ -516,20 +503,18 @@ private void doBegin(final Context ctx) throws Exception { } } - private void runBegin(final Context ctx, UnmanagedTransaction tx, final Pair> serializer) throws Exception { - final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, tx.getTransactionId())), false, false); - ctx.getChannelHandlerContext().writeAndFlush(new DefaultHttpContent(chunk)); + private void runBegin(final UnmanagedTransaction tx, final HttpResponseCoordinator coordinator) throws Exception { + coordinator.writeData(List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, tx.getTransactionId())), false, false); } private void handleGraphOp(final Context ctx, final String transactionId, final Consumer graphOp, - final Pair> serializer) throws Exception { + final HttpResponseCoordinator coordinator) throws Exception { final Graph graph = graphManager.getTraversalSource(ctx.getRequestMessage().getField(Tokens.ARGS_G)).getGraph(); graphOp.accept(graph.tx()); transactionManager.get(transactionId).ifPresent(tx -> tx.close(true)); - final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, transactionId)), false, false); - ctx.getChannelHandlerContext().writeAndFlush(new DefaultHttpContent(chunk)); + coordinator.writeData(List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, transactionId)), false, false); } private Bindings mergeBindingsFromRequest(final Context ctx, final Bindings bindings) throws ProcessingException { @@ -570,7 +555,7 @@ private Bindings mergeBindingsFromRequest(final Context ctx, final Bindings bind return bindings; } - private void handleIterator(final Context context, final Iterator itty, final MessageSerializer serializer, final boolean bulking) throws InterruptedException { + private void handleIterator(final Context context, final Iterator itty, final HttpResponseCoordinator coordinator, final boolean bulking) throws InterruptedException { final ChannelHandlerContext nettyContext = context.getChannelHandlerContext(); final RequestMessage msg = context.getRequestMessage(); final Settings settings = context.getSettings(); @@ -582,14 +567,11 @@ private void handleIterator(final Context context, final Iterator itty, final Me // we have an empty iterator - happens on stuff like: g.V().iterate() if (!itty.hasNext()) { - ByteBuf chunk = null; try { - chunk = makeChunk(context, serializer, new ArrayList<>(), false, bulking); - nettyContext.writeAndFlush(new DefaultHttpContent(chunk)); + coordinator.writeData(new ArrayList<>(), false, bulking); } catch (Exception ex) { - // Bytebuf is a countable release - if it does not get written downstream - // it needs to be released here - if (chunk != null) chunk.release(); + // serialization error is written back to the driver inside writeData (which terminates the + // response); nothing further to do here. } return; } @@ -646,37 +628,25 @@ private void handleIterator(final Context context, final Iterator itty, final Me // results till the timeout. checking for isActive() should help prevent that. if (nettyContext.channel().isActive() && nettyContext.channel().isWritable()) { if (aggregate.size() == resultIterationBatchSize || !itty.hasNext()) { - ByteBuf chunk = null; - try { - chunk = makeChunk(context, serializer, aggregate, itty.hasNext(), bulking); - } catch (Exception ex) { - // Bytebuf is a countable release - if it does not get written downstream - // it needs to be released here - if (chunk != null) chunk.release(); - - // exception is handled in makeFrame() - serialization error gets written back to driver - // at that point - break; - } - // track whether there is anything left in the iterator because it needs to be accessed after // the transaction could be closed - in that case a call to hasNext() could open a new transaction - // unintentionally + // unintentionally. compute it before writing so the (possibly tx-closing) hasNext() does not run + // under the coordinator monitor. hasMore = itty.hasNext(); + final List page = aggregate; + // only need a fresh aggregation list if there's more stuff to write + if (hasMore) { + aggregate = new ArrayList<>(resultIterationBatchSize); + } + try { - // only need to reset the aggregation list if there's more stuff to write - if (hasMore) { - aggregate = new ArrayList<>(resultIterationBatchSize); - } + coordinator.writeData(page, hasMore, bulking); } catch (Exception ex) { - // Bytebuf is a countable release - if it does not get written downstream - // it needs to be released here - if (chunk != null) chunk.release(); - throw ex; + // serialization error gets written back to the driver inside writeData (which terminates + // the response); stop iterating. + break; } - - nettyContext.writeAndFlush(new DefaultHttpContent(chunk)); } } else { final long currentTime = System.currentTimeMillis(); @@ -724,73 +694,4 @@ private boolean acceptsDeflateEncoding(List encodings) { return false; } - private static ByteBuf makeChunk(final Context ctx, final MessageSerializer serializer, - final List aggregate, final boolean hasMore, - final boolean bulking) throws Exception { - try { - final ChannelHandlerContext nettyContext = ctx.getChannelHandlerContext(); - - ctx.handleDetachment(aggregate); - - if (!hasMore && ctx.getRequestState() == STREAMING) { - ctx.setRequestState(FINISHING); - } - - ResponseMessage responseMessage = null; - - // for this state no need to build full ResponseMessage - if (ctx.getRequestState() != STREAMING) { - final ResponseMessage.Builder builder = ResponseMessage.build().result(aggregate); - - // need to put status in last message - if (ctx.getRequestState() == FINISHING) { - builder.code(HttpResponseStatus.OK); - } - - builder.bulked(bulking); - - responseMessage = builder.create(); - } - - switch (ctx.getRequestState()) { - case NOT_STARTED: - if (hasMore) { - ctx.setRequestState(STREAMING); - return serializer.writeHeader(responseMessage, nettyContext.alloc()); - } - - final ByteBuf fullResponse = serializer.serializeResponseAsBinary(ResponseMessage.build() - .result(aggregate) - .bulked(bulking) - .code(HttpResponseStatus.OK) - .create(), nettyContext.alloc()); - ctx.setRequestState(FINISHED); - return fullResponse; - - case STREAMING: - return serializer.writeChunk(aggregate, nettyContext.alloc()); - case FINISHING: - final ByteBuf footer = serializer.writeFooter(responseMessage, nettyContext.alloc()); - ctx.setRequestState(FINISHED); - return footer; - } - - return serializer.serializeResponseAsBinary(responseMessage, nettyContext.alloc()); - - } catch (Exception ex) { - final UUID requestId = ctx.getChannelHandlerContext().attr(StateKey.REQUEST_ID).get(); - logger.warn("The result [{}] in the request {} could not be serialized and returned.", aggregate, requestId, ex); - writeError(ctx, GremlinError.serialization(ex), serializer); - throw ex; - } - } - - public enum RequestState { - NOT_STARTED, - STREAMING, - // last portion of data - FINISHING, - FINISHED, - ERROR - } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java index 1e5f9a0a7ce..8a4d5e316ad 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java @@ -19,27 +19,15 @@ package org.apache.tinkerpop.gremlin.server.handler; import com.codahale.metrics.Meter; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.DefaultHttpContent; -import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.util.CharsetUtil; -import org.apache.tinkerpop.gremlin.server.Context; import org.apache.tinkerpop.gremlin.server.GremlinServer; -import org.apache.tinkerpop.gremlin.server.util.GremlinError; import org.apache.tinkerpop.gremlin.server.util.MetricManager; -import org.apache.tinkerpop.gremlin.util.MessageSerializer; -import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; -import org.apache.tinkerpop.gremlin.util.ser.SerTokens; -import org.apache.tinkerpop.gremlin.util.ser.SerializationException; import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; import org.apache.tinkerpop.shaded.jackson.databind.node.ObjectNode; import org.slf4j.Logger; @@ -47,8 +35,6 @@ import static com.codahale.metrics.MetricRegistry.name; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; -import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING; -import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; /** @@ -92,101 +78,4 @@ public static void sendError(final ChannelHandlerContext ctx, final HttpResponse ctx.writeAndFlush(response); } - - /** - * Writes and flushes a {@link ResponseMessage} that contains an error back to the client. Can be used to send - * errors while streaming or when no response chunk has been sent. This serves as the end of a response. - * - * @param context The netty context. - * @param responseMessage The response to send back. - * @param serializer The serializer to use to serialize the error response. - */ - static void writeError(final Context context, final ResponseMessage responseMessage, final MessageSerializer serializer) { - // Prevent writing after the response is already terminated. A second write would corrupt - // HTTP framing on keep-alive connections, poisoning them for subsequent requests. - if (context.getRequestState() == HttpGremlinEndpointHandler.RequestState.ERROR || - context.getRequestState() == HttpGremlinEndpointHandler.RequestState.FINISHED) { - return; - } - - try { - final ChannelHandlerContext ctx = context.getChannelHandlerContext(); - final ByteBuf ByteBuf = context.getRequestState() == HttpGremlinEndpointHandler.RequestState.STREAMING - ? serializer.writeErrorFooter(responseMessage, ctx.alloc()) - : serializer.serializeResponseAsBinary(responseMessage, ctx.alloc()); - - context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR); - - if (!ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).get()) { - sendHttpResponse(ctx, - responseMessage.getStatus().getCode(), - HttpHeaderNames.CONTENT_TYPE, ctx.channel().attr(StateKey.SERIALIZER).get().getValue0()); - } - - ctx.writeAndFlush(new DefaultHttpContent(ByteBuf)); - - sendLastHttpContent(ctx, responseMessage.getStatus().getCode(), responseMessage.getStatus().getException()); - } catch (SerializationException se) { - logger.warn("Unable to serialize ResponseMessage: {} ", responseMessage); - } - } - - /** - * Writes a {@link GremlinError} into the status object of a {@link ResponseMessage} and then flushes it. Used to - * send specific errors back to the client. This serves as the end of a response. - * - * @param context The netty context. - * @param error The GremlinError used to populate the status. - * @param serializer The serializer to use to serialize the error response. - */ - static void writeError(final Context context, final GremlinError error, final MessageSerializer serializer) { - final ResponseMessage responseMessage = ResponseMessage.build() - .code(error.getCode()) - .statusMessage(error.getMessage()) - .exception(error.getException()) - .create(); - - writeError(context, responseMessage, serializer); - } - - /** - * Adds trailing headers specified in the arguments to a {@link DefaultLastHttpContent} and then flushes it. This - * serves as the end of a response. - * - * @param ctx The netty context. - * @param statusCode The status code to include in the trailers. - * @param exceptionType The type of exception to include in the trailers. Leave blank or null if no error occurred. - */ - static void sendLastHttpContent(final ChannelHandlerContext ctx, final HttpResponseStatus statusCode, final String exceptionType) { - // TODO: this might be not sent if exception occurs early so HTTP not properly terminated - final DefaultLastHttpContent defaultLastHttpContent = new DefaultLastHttpContent(); - defaultLastHttpContent.trailingHeaders().add(SerTokens.TOKEN_CODE, statusCode.code()); - if (exceptionType != null && !exceptionType.isEmpty()) { - defaultLastHttpContent.trailingHeaders().add(SerTokens.TOKEN_EXCEPTION, exceptionType); - } - - ctx.writeAndFlush(defaultLastHttpContent); - } - - /** - * Sends the initial HTTP response header with the given status and optional header pairs. - * Also marks the channel as having sent a response. Headers must be provided as alternating - * name/value pairs (e.g. {@code CONTENT_TYPE, "application/json"}). - * - * @param ctx The netty channel context. - * @param status The HTTP status code for the response. - * @param headers Alternating header name/value pairs to set on the response. - * @throws IllegalArgumentException if headers length is not even - */ - static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpResponseStatus status, final CharSequence... headers) { - if ((headers.length%2) != 0) throw new IllegalArgumentException("Headers should come in pairs."); - - final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, status); - responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); - for (int i=0; i + * All mutating methods are guarded by this object's monitor, so the eval-worker thread and the timeout-scheduler + * thread cannot interleave their writes. The {@link State#COMPLETED} short-circuit makes the terminal-write methods + * idempotent: whichever thread terminates the response first wins, and the others become no-ops. The eval task calls + * {@link #complete} from a {@code finally} block, so the terminal {@code LastHttpContent} (which clears the channel's + * in-use flag and ends the chunked stream) is written even if the body-producing code threw. + *

+ * Locking discipline (IMPORTANT for maintainers): the {@code state} and {@code headerSent} fields are guarded by + * this object's monitor. The {@code public}/package methods are {@code synchronized} and are the only valid entry + * points. The private helpers ({@code ensureHeaderSent}, {@code writeTerminal}) mutate or rely on that guarded state + * but are deliberately NOT {@code synchronized} — they assume the caller already holds the monitor. Only ever call + * them from a {@code synchronized} method, and only ever read/write {@code state}/{@code headerSent} while holding + * the monitor. Do not add an entry point that touches this state without {@code synchronized}. + */ +final class HttpResponseCoordinator { + + private static final Logger logger = LoggerFactory.getLogger(HttpResponseCoordinator.class); + + private enum State { + /** No response body has been written yet. */ + NOT_STARTED, + /** The opening framing (header chunk) has been written; more chunks or a footer are expected. */ + STREAMING, + /** The response has been fully terminated (terminal {@code LastHttpContent} written); no further writes. */ + COMPLETED + } + + private final Context context; + private final MessageSerializer serializer; + private final String contentType; + + private State state = State.NOT_STARTED; + private boolean headerSent = false; + + HttpResponseCoordinator(final Context context, final String contentType, final MessageSerializer serializer) { + this.context = context; + this.contentType = contentType; + this.serializer = serializer; + } + + /** + * Sends the {@code 200 OK} chunked response header exactly once. The header pairs are supplied by the caller + * because, for a begin-transaction request, the transaction id header is only known once the transaction has been + * opened. No-op if the response was already terminated or the header was already sent. + */ + synchronized void writeHeader(final CharSequence... headers) { + if (state == State.COMPLETED) return; + ensureHeaderSent(HttpResponseStatus.OK, headers); + } + + /** + * Writes one page of results. On the final page ({@code hasMore == false}) this also writes the body footer and + * the terminal {@code LastHttpContent}, transitioning to {@link State#COMPLETED}. No-op if already completed. + *

+ * On a serialization failure the error is written through {@link #writeError} (terminating the response) and the + * exception is re-thrown so the caller can stop iterating. + */ + synchronized void writeData(final List aggregate, final boolean hasMore, final boolean bulking) throws Exception { + if (state == State.COMPLETED) return; + + final ChannelHandlerContext nettyContext = context.getChannelHandlerContext(); + // backstop: the eval task sends the header before iterating, but guarantee it here too. + ensureHeaderSent(HttpResponseStatus.OK, HttpHeaderNames.CONTENT_TYPE, contentType); + + final boolean firstChunk = state == State.NOT_STARTED; + final boolean terminal = !hasMore; + + final ByteBuf chunk; + try { + // detachment runs inside the try so a failure here is reported to the client as a serialization error + // (matching the prior makeChunk behavior) rather than escaping uncaught. + context.handleDetachment(aggregate); + + // An intermediate streaming chunk carries only data (no status, no ResponseMessage). Every other case + // builds a full ResponseMessage; the terminal page additionally carries the OK status. The four cases map + // onto distinct serializer framing calls: single-shot (first+terminal), header (first+more), + // footer (streaming+terminal), and chunk (streaming+more). + if (!firstChunk && !terminal) { + chunk = serializer.writeChunk(aggregate, nettyContext.alloc()); + } else { + final ResponseMessage.Builder builder = ResponseMessage.build().result(aggregate).bulked(bulking); + if (terminal) builder.code(HttpResponseStatus.OK); + final ResponseMessage responseMessage = builder.create(); + + if (firstChunk && terminal) { + chunk = serializer.serializeResponseAsBinary(responseMessage, nettyContext.alloc()); + } else if (firstChunk) { + state = State.STREAMING; + chunk = serializer.writeHeader(responseMessage, nettyContext.alloc()); + } else { + chunk = serializer.writeFooter(responseMessage, nettyContext.alloc()); + } + } + } catch (Exception ex) { + final UUID requestId = nettyContext.attr(StateKey.REQUEST_ID).get(); + logger.warn("The result [{}] in the request {} could not be serialized and returned.", aggregate, requestId, ex); + writeError(GremlinError.serialization(ex)); + throw ex; + } + + nettyContext.writeAndFlush(new DefaultHttpContent(chunk)); + + // The final page closes the body framing above; now end the chunked stream. + if (!hasMore) { + writeTerminal(HttpResponseStatus.OK, ""); + state = State.COMPLETED; + } + } + + /** + * Writes an error response and terminates the stream. Serializes an error footer when mid-stream or a complete + * message otherwise. No-op if the response was already terminated. Never throws on serialization failure (logs + * instead) so the terminal {@code LastHttpContent} is still written. + */ + synchronized void writeError(final ResponseMessage responseMessage) { + if (state == State.COMPLETED) return; + + final ChannelHandlerContext ctx = context.getChannelHandlerContext(); + try { + final ByteBuf byteBuf = state == State.STREAMING + ? serializer.writeErrorFooter(responseMessage, ctx.alloc()) + : serializer.serializeResponseAsBinary(responseMessage, ctx.alloc()); + + // an error response that has not yet emitted a header carries the error status code on the header line, + // matching the prior behavior of HttpHandlerUtil.writeError. + ensureHeaderSent(responseMessage.getStatus().getCode(), HttpHeaderNames.CONTENT_TYPE, contentType); + + ctx.writeAndFlush(new DefaultHttpContent(byteBuf)); + writeTerminal(responseMessage.getStatus().getCode(), responseMessage.getStatus().getException()); + state = State.COMPLETED; + } catch (SerializationException se) { + logger.warn("Unable to serialize ResponseMessage: {} ", responseMessage); + } + } + + /** + * Writes an error response built from a {@link GremlinError}. + */ + synchronized void writeError(final GremlinError error) { + writeError(ResponseMessage.build() + .code(error.getCode()) + .statusMessage(error.getMessage()) + .exception(error.getException()) + .create()); + } + + /** + * Terminal call from the eval task's {@code finally}. Idempotent: writes the terminal {@code LastHttpContent} + * only if the response was not already completed (by the final data page or an error). This guarantees the + * chunked stream is always ended even when the body-producing code threw an unchecked exception. + */ + synchronized void complete(final HttpResponseStatus status, final String exceptionType) { + if (state == State.COMPLETED) return; + ensureHeaderSent(status, HttpHeaderNames.CONTENT_TYPE, contentType); + writeTerminal(status, exceptionType); + state = State.COMPLETED; + } + + // Caller must hold this object's monitor: reads and mutates the guarded {@code headerSent} field. Only ever + // invoked from the synchronized public methods; intentionally not synchronized itself (see class javadoc). + private void ensureHeaderSent(final HttpResponseStatus status, final CharSequence... headers) { + if (headerSent) return; + if ((headers.length % 2) != 0) throw new IllegalArgumentException("Headers should come in pairs."); + + final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, status); + responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); + for (int i = 0; i < headers.length; i += 2) { + responseHeader.headers().set(headers[i], headers[i + 1]); + } + context.getChannelHandlerContext().writeAndFlush(responseHeader); + headerSent = true; + } + + // Caller must hold this object's monitor: invoked only from the synchronized terminal-write methods as part of + // their guarded state transition to COMPLETED. Intentionally not synchronized itself (see class javadoc). + private void writeTerminal(final HttpResponseStatus statusCode, final String exceptionType) { + final DefaultLastHttpContent last = new DefaultLastHttpContent(); + last.trailingHeaders().add(SerTokens.TOKEN_CODE, statusCode.code()); + if (exceptionType != null && !exceptionType.isEmpty()) { + last.trailingHeaders().add(SerTokens.TOKEN_EXCEPTION, exceptionType); + } + context.getChannelHandlerContext().writeAndFlush(last); + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java index 78499515423..b8727cf3cc3 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java @@ -50,11 +50,6 @@ private StateKey() {} */ public static final AttributeKey REQUEST_ID = AttributeKey.valueOf("requestId"); - /** - * The key for whether a {@link io.netty.handler.codec.http.HttpResponse} has been sent for the current response. - */ - public static final AttributeKey HTTP_RESPONSE_SENT = AttributeKey.valueOf("responseSent"); - /** * The key for the current {@link AuthenticatedUser}. */ diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java index c3d3b8a2746..3dfc3942046 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java @@ -165,12 +165,24 @@ public synchronized void close(boolean force) { } } - // prevent any additional requests from processing. if the kill was not "forced" then jobs were scheduled to - // try to rollback open transactions. those jobs either timed-out or completed successfully. either way, no - // additional jobs will be allowed, running jobs will be cancelled (if possible) and any scheduled jobs will - // be cancelled - executor.shutdownNow(); + // ORDERING IS LOAD-BEARING: manager.destroy(transactionId) MUST happen before executor.shutdown(). + // + // We use a graceful shutdown() rather than shutdownNow(). A sibling request for this transaction may already + // be queued behind the commit/rollback that triggered this close (single-threaded executor). shutdownNow() + // would drain and silently discard those queued tasks, leaving their HTTP clients with no response (a hang) - + // this was observed as an intermittent CI hang. shutdown() instead lets each queued task run to completion + // (so every request gets a response) while still terminating the worker thread once the queue drains, so the + // transaction thread is not leaked. shutdown() also avoids the self-interrupt shutdownNow() caused when close() + // runs on the tx thread itself (commit/rollback), which could corrupt the in-flight response write. + // + // Because those queued tasks now actually RUN, correctness depends on them NOT mutating a committed/rolled-back + // transaction. Removing the transaction from the manager FIRST guarantees that: when a queued sibling task + // runs, its pre-evaluation guard (transactionManager.get(txId)) finds nothing and fails fast with a 404 + // (transaction not found) before reaching evaluation. If the destroy were moved after shutdown(), a queued + // task (e.g. an addV submitted after a commit) could execute against the transaction and leak data. Do not + // reorder these two statements. manager.destroy(transactionId); + executor.shutdown(); Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> f.cancel(true)); logger.debug("Transaction {} closed", transactionId); } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java index ee881d9b8ca..c67a0add0d3 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java @@ -948,8 +948,21 @@ public void should500OnPOSTWithEvaluationTimeout() throws Exception { } }; + // The second query also times out; additionally assert that the timed-out response is exactly one + // well-formed, terminated chunked response. The timeout path is the cross-thread case (the scheduler thread + // calls the coordinator's writeError concurrently with the eval worker), so this guards that the single- + // response + termination guarantee holds when the second writer is the timeout, not just the eval failure + // path covered by the shouldHandleErrors* tests. EntityUtils.toString only returns once the chunked entity + // is fully read including the terminal zero-length chunk, so a non-terminated response would block until the + // @Test timeout. final Callable secondQueryWrapper = () -> { try (final CloseableHttpResponse response = secondClient.execute(secondPost)) { + assertTrue(response.getEntity().isChunked()); + final JsonNode node = mapper.readTree(EntityUtils.toString(response.getEntity())); + assertEquals(500, node.get("status").get("code").intValue()); + final Header[] footers = getTrailingHeaders(response); + assertEquals("code", footers[0].getName()); + assertEquals("500", footers[0].getValue()); return response.getStatusLine().getStatusCode(); } }; @@ -964,6 +977,7 @@ public void should500OnPOSTWithEvaluationTimeout() throws Exception { threadPool.shutdown(); } + @Test public void shouldErrorWhenTryingToConnectWithHttp1() throws Exception { final CloseableHttpClient httpclient = HttpClients.createDefault(); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java index 156b2773dd7..95895e66b46 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java @@ -467,13 +467,19 @@ public void shouldReturn404ForAllOperationsOnClosedTransaction() throws Exceptio public void shouldNotLeakDataWhenTraversalQueuedBehindCommit() throws Exception { final String txId = beginTx(client, GTX); - // add vertices and an edge in the transaction + // add two vertices and an edge between them in the transaction. the edge is required so that the + // "long traversal" below (repeat(both())) actually has something to traverse and keeps the single + // transaction executor occupied while the commit and short query queue behind it. without an edge, + // both() yields nothing and the traversal returns immediately, defeating the ordering this test relies on. try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 1)", GTX)) { assertEquals(200, r.getStatusLine().getStatusCode()); } try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 2)", GTX)) { assertEquals(200, r.getStatusLine().getStatusCode()); } + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V(1).addE('self').to(__.V(2))", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } // Fire three requests concurrently: a long traversal to occupy the server executor, then a commit that queues // behind it, then a short query that queues behind the commit. The short query should fail with 404 because the