Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptChecker;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.AbstractTraverser;
import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
Expand All @@ -36,7 +35,6 @@
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The context of Gremlin Server within which a particular request is made.
Expand All @@ -54,8 +52,6 @@ public class Context {
private final String materializeProperties;
private final Object gremlinArgument;
private final RequestType requestType;
private HttpGremlinEndpointHandler.RequestState requestState;
private final AtomicBoolean startedResponse = new AtomicBoolean(false);
private ScheduledFuture<?> timeoutExecutor = null;
private boolean timeoutExecutorGrabbed = false;
private final Object timeoutExecutorLock = new Object();
Expand All @@ -65,14 +61,6 @@ public class Context {
public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx,
final Settings settings, final GraphManager graphManager,
final GremlinExecutor gremlinExecutor, final ScheduledExecutorService scheduledExecutorService) {
this(requestMessage, ctx, settings, graphManager, gremlinExecutor, scheduledExecutorService,
HttpGremlinEndpointHandler.RequestState.NOT_STARTED);
}

public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx,
final Settings settings, final GraphManager graphManager,
final GremlinExecutor gremlinExecutor, final ScheduledExecutorService scheduledExecutorService,
final HttpGremlinEndpointHandler.RequestState requestState) {
this.requestMessage = requestMessage;
this.channelHandlerContext = ctx;
this.settings = settings;
Expand All @@ -84,7 +72,6 @@ public Context(final RequestMessage requestMessage, final ChannelHandlerContext
final String gremlin = requestMessage.getGremlin();
this.gremlinArgument = gremlin;
this.requestType = RequestType.fromGremlin(gremlin);
this.requestState = requestState;
this.requestTimeout = determineTimeout();
this.materializeProperties = determineMaterializeProperties();
this.transactionId = requestMessage.getField(Tokens.ARGS_TRANSACTION_ID);
Expand Down Expand Up @@ -200,16 +187,6 @@ public GremlinExecutor getGremlinExecutor() {
return gremlinExecutor;
}

/**
* Gets whether the server has started processing the response for this request.
*/
public boolean getStartedResponse() { return startedResponse.get(); }

/**
* Signal that the server has started processing the response.
*/
public void setStartedResponse() { startedResponse.set(true); }

private long determineTimeout() {
// timeout override - handle both deprecated and newly named configuration. earlier logic should prevent
// both configurations from being submitted at the same time
Expand Down Expand Up @@ -250,14 +227,6 @@ public void handleDetachment(final List<Object> aggregate) {
}
}

public HttpGremlinEndpointHandler.RequestState getRequestState() {
return requestState;
}

public void setRequestState(HttpGremlinEndpointHandler.RequestState requestState) {
this.requestState = requestState;
}

/**
* Classifies an HTTP request by the kind of work it performs. Transaction control requests reuse the canonical
* Gremlin idioms ({@code g.tx().begin()} etc.) as protocol signals rather than evaluating them, because the
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,22 @@
package org.apache.tinkerpop.gremlin.server.handler;

import com.codahale.metrics.Meter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.CharsetUtil;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.GremlinServer;
import org.apache.tinkerpop.gremlin.server.util.GremlinError;
import org.apache.tinkerpop.gremlin.server.util.MetricManager;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
import org.apache.tinkerpop.shaded.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.codahale.metrics.MetricRegistry.name;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
Expand Down Expand Up @@ -92,101 +78,4 @@ public static void sendError(final ChannelHandlerContext ctx, final HttpResponse

ctx.writeAndFlush(response);
}

/**
* Writes and flushes a {@link ResponseMessage} that contains an error back to the client. Can be used to send
* errors while streaming or when no response chunk has been sent. This serves as the end of a response.
*
* @param context The netty context.
* @param responseMessage The response to send back.
* @param serializer The serializer to use to serialize the error response.
*/
static void writeError(final Context context, final ResponseMessage responseMessage, final MessageSerializer<?> serializer) {
// Prevent writing after the response is already terminated. A second write would corrupt
// HTTP framing on keep-alive connections, poisoning them for subsequent requests.
if (context.getRequestState() == HttpGremlinEndpointHandler.RequestState.ERROR ||
context.getRequestState() == HttpGremlinEndpointHandler.RequestState.FINISHED) {
return;
}

try {
final ChannelHandlerContext ctx = context.getChannelHandlerContext();
final ByteBuf ByteBuf = context.getRequestState() == HttpGremlinEndpointHandler.RequestState.STREAMING
? serializer.writeErrorFooter(responseMessage, ctx.alloc())
: serializer.serializeResponseAsBinary(responseMessage, ctx.alloc());

context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR);

if (!ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).get()) {
sendHttpResponse(ctx,
responseMessage.getStatus().getCode(),
HttpHeaderNames.CONTENT_TYPE, ctx.channel().attr(StateKey.SERIALIZER).get().getValue0());
}

ctx.writeAndFlush(new DefaultHttpContent(ByteBuf));

sendLastHttpContent(ctx, responseMessage.getStatus().getCode(), responseMessage.getStatus().getException());
} catch (SerializationException se) {
logger.warn("Unable to serialize ResponseMessage: {} ", responseMessage);
}
}

/**
* Writes a {@link GremlinError} into the status object of a {@link ResponseMessage} and then flushes it. Used to
* send specific errors back to the client. This serves as the end of a response.
*
* @param context The netty context.
* @param error The GremlinError used to populate the status.
* @param serializer The serializer to use to serialize the error response.
*/
static void writeError(final Context context, final GremlinError error, final MessageSerializer<?> serializer) {
final ResponseMessage responseMessage = ResponseMessage.build()
.code(error.getCode())
.statusMessage(error.getMessage())
.exception(error.getException())
.create();

writeError(context, responseMessage, serializer);
}

/**
* Adds trailing headers specified in the arguments to a {@link DefaultLastHttpContent} and then flushes it. This
* serves as the end of a response.
*
* @param ctx The netty context.
* @param statusCode The status code to include in the trailers.
* @param exceptionType The type of exception to include in the trailers. Leave blank or null if no error occurred.
*/
static void sendLastHttpContent(final ChannelHandlerContext ctx, final HttpResponseStatus statusCode, final String exceptionType) {
// TODO: this might be not sent if exception occurs early so HTTP not properly terminated
final DefaultLastHttpContent defaultLastHttpContent = new DefaultLastHttpContent();
defaultLastHttpContent.trailingHeaders().add(SerTokens.TOKEN_CODE, statusCode.code());
if (exceptionType != null && !exceptionType.isEmpty()) {
defaultLastHttpContent.trailingHeaders().add(SerTokens.TOKEN_EXCEPTION, exceptionType);
}

ctx.writeAndFlush(defaultLastHttpContent);
}

/**
* Sends the initial HTTP response header with the given status and optional header pairs.
* Also marks the channel as having sent a response. Headers must be provided as alternating
* name/value pairs (e.g. {@code CONTENT_TYPE, "application/json"}).
*
* @param ctx The netty channel context.
* @param status The HTTP status code for the response.
* @param headers Alternating header name/value pairs to set on the response.
* @throws IllegalArgumentException if headers length is not even
*/
static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpResponseStatus status, final CharSequence... headers) {
if ((headers.length%2) != 0) throw new IllegalArgumentException("Headers should come in pairs.");

final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, status);
responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED);
for (int i=0; i<headers.length; i+=2) {
responseHeader.headers().set(headers[i], headers[i+1]);
}
ctx.writeAndFlush(responseHeader);
ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
}
}
Loading
Loading