Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ on:
branches: [ "JDK17/Springboot3" ]

jobs:
test:
JDK17-Test:
runs-on: ubuntu-latest
steps:
- name: Checkout codes
Expand All @@ -35,3 +35,19 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
flags: pull-request
name: PR-Coverage
JDK21-Test:
runs-on: ubuntu-latest
steps:
- name: Checkout codes
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.ref }}
repository: ${{ github.event.pull_request.head.repo.full_name }}
- name: Set up JDK 21
uses: actions/setup-java@v3
with:
java-version: '21'
distribution: 'temurin'
cache: maven
- name: Test with Maven
run: mvn clean install -B -U --file pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Tencent is pleased to support the open source community by making tRPC available.
*
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* All rights reserved.
*
* If you have downloaded a copy of the tRPC source code from Tencent,
Expand Down Expand Up @@ -32,7 +32,7 @@
public class ClientConfig extends BaseProtocolConfig {

private static final Logger logger = LoggerFactory.getLogger(ClientConfig.class);

@ConfigProperty
protected String namespace;

Expand Down Expand Up @@ -75,7 +75,7 @@ public class ClientConfig extends BaseProtocolConfig {
/**
* BackendConfig mapping.
*/
protected Map<String, BackendConfig> backendConfigMap = Maps.newHashMap();
protected Map<String, BackendConfig> backendConfigMap = Maps.newConcurrentMap();

/**
* Whether the service is registered.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,54 @@ public void testClientConfigNotEmptryBackendConfig() {
assertEquals(false, backendConfig.isIoThreadGroupShare());
assertEquals(1000, backendConfig.getIoThreads());
}
}

@Test
public void testInitAndStop() {
ClientConfig config = new ClientConfig();
config.init();
assertTrue(config.isInitialized());
config.init();
assertTrue(config.isInitialized());
config.stop();
config.stop();
}

@Test
public void testGetBackendConfig() {
ClientConfig config = new ClientConfig();
BackendConfig backendConfig = new BackendConfig();
backendConfig.setName("svc");
backendConfig.setNamingUrl("a://b");
config.addBackendConfig(backendConfig);
assertEquals(backendConfig, config.getBackendConfig("svc"));
}

@Test
public void testSetterAfterInitThrows() {
ClientConfig config = new ClientConfig();
config.init();
Exception ex = null;
try {
config.setNamespace("ns");
} catch (Exception e) {
ex = e;
}
assertTrue(ex instanceof IllegalArgumentException);
}

@Test
public void testSetBackendConfigMap() {
ClientConfig config = new ClientConfig();
java.util.Map<String, BackendConfig> map = new java.util.HashMap<>();
config.setBackendConfigMap(map);
assertEquals(map, config.getBackendConfigMap());
}

@Test
public void testSetDefaultIdempotent() {
ClientConfig config = new ClientConfig();
config.setDefault();
config.setDefault();
assertTrue(config.isSetDefault());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ private Response handleResponse(Request request, CloseableHttpResponse httpRespo
Map<String, Object> respAttachments = new HashMap<>();
for (Header header : httpResponse.getAllHeaders()) {
String name = header.getName();
for (HeaderElement element : header.getElements()) {
String value = element.getName();
respAttachments.put(name, value.getBytes(StandardCharsets.UTF_8));
}
String value = header.getValue();
respAttachments.put(name, value.getBytes(StandardCharsets.UTF_8));
}

Header contentLengthHdr = httpResponse.getFirstHeader(HttpHeaders.CONTENT_LENGTH);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Tencent is pleased to support the open source community by making tRPC available.
*
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* Copyright (C) 2023 Tencent.
* All rights reserved.
*
* If you have downloaded a copy of the tRPC source code from Tencent,
Expand All @@ -19,11 +19,11 @@
import com.tencent.trpc.core.exception.TRpcException;
import com.tencent.trpc.core.logger.Logger;
import com.tencent.trpc.core.logger.LoggerFactory;
import com.tencent.trpc.core.rpc.RpcContext;
import com.tencent.trpc.core.rpc.CallInfo;
import com.tencent.trpc.core.rpc.ProviderInvoker;
import com.tencent.trpc.core.rpc.RequestMeta;
import com.tencent.trpc.core.rpc.Response;
import com.tencent.trpc.core.rpc.RpcContext;
import com.tencent.trpc.core.rpc.RpcInvocation;
import com.tencent.trpc.core.rpc.RpcServerContext;
import com.tencent.trpc.core.rpc.common.RpcMethodInfo;
Expand All @@ -38,17 +38,19 @@
import com.tencent.trpc.proto.http.common.RpcServerContextWithHttp;
import com.tencent.trpc.proto.http.common.TrpcServletRequestWrapper;
import com.tencent.trpc.proto.http.common.TrpcServletResponseWrapper;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;

Expand All @@ -66,34 +68,41 @@ public abstract class AbstractHttpExecutor {

protected void execute(HttpServletRequest request, HttpServletResponse response,
RpcMethodInfoAndInvoker methodInfoAndInvoker) {

AtomicBoolean responded = new AtomicBoolean(false);
try {

DefRequest rpcRequest = buildDefRequest(request, response, methodInfoAndInvoker);

CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture<Void> completionFuture = new CompletableFuture<>();

// use a thread pool for asynchronous processing
invokeRpcRequest(methodInfoAndInvoker.getInvoker(), rpcRequest, countDownLatch);
invokeRpcRequest(methodInfoAndInvoker.getInvoker(), rpcRequest, completionFuture, responded);

// If the request carries a timeout, use this timeout to wait for the request to be processed.
// If not carried, use the default timeout.
long requestTimeout = rpcRequest.getMeta().getTimeout();
if (requestTimeout <= 0) {
requestTimeout = methodInfoAndInvoker.getInvoker().getConfig().getRequestTimeout();
}
if (requestTimeout > 0 && !countDownLatch.await(requestTimeout, TimeUnit.MILLISECONDS)) {
throw TRpcException.newFrameException(ErrorCode.TRPC_SERVER_TIMEOUT_ERR,
"wait http request execute timeout");
if (requestTimeout > 0) {
try {
completionFuture.get(requestTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
if (responded.compareAndSet(false, true)) {
doErrorReply(request, response,
TRpcException.newFrameException(ErrorCode.TRPC_SERVER_TIMEOUT_ERR,
"wait http request execute timeout"));
}
}
} else {
countDownLatch.await();
completionFuture.get();
}

} catch (Exception ex) {
logger.error("dispatch request [{}] error", request, ex);
doErrorReply(request, response, ex);
if (responded.compareAndSet(false, true)) {
doErrorReply(request, response, ex);
}
}

}

/**
Expand All @@ -108,55 +117,83 @@ protected void execute(HttpServletRequest request, HttpServletResponse response,
/**
* Request processing
*
* @param countDownLatch latch used to wait for the request processing
* @param invoker the invoker
* @param rpcRequest the rpc request
* @param completionFuture the completion future
* @param responded the responded flag
*/
private void invokeRpcRequest(ProviderInvoker<?> invoker, DefRequest rpcRequest, CountDownLatch countDownLatch) {
private void invokeRpcRequest(ProviderInvoker<?> invoker, DefRequest rpcRequest,
CompletableFuture<Void> completionFuture,
AtomicBoolean responded) {

WorkerPool workerPool = invoker.getConfig().getWorkerPoolObj();

if (null == workerPool) {
logger.error("dispatch rpcRequest [{}] error, workerPool is empty", rpcRequest);
throw TRpcException.newFrameException(ErrorCode.TRPC_SERVER_NOSERVICE_ERR,
"not found service, workerPool is empty");
completionFuture.completeExceptionally(TRpcException.newFrameException(ErrorCode.TRPC_SERVER_NOSERVICE_ERR,
"not found service, workerPool is empty"));
return;
}

workerPool.execute(() -> {

// Get the original http response
HttpServletResponse response = getOriginalResponse(rpcRequest);

// Invoke the routing implementation method to handle the request.
CompletionStage<Response> future = invoker.invoke(rpcRequest);
future.whenComplete((result, t) -> {
try {
// Throw the call exception, which will be handled uniformly by the exception handling program.
if (t != null) {
throw t;
}

// Throw a business logic exception, which will be handled uniformly
// by the exception handling program.
Throwable ex = result.getException();
if (ex != null) {
throw ex;
try {
// Get the original http response
HttpServletResponse response = getOriginalResponse(rpcRequest);
// Invoke the routing implementation method to handle the request.
CompletionStage<Response> rpcFuture = invoker.invoke(rpcRequest);

rpcFuture.whenComplete((result, throwable) -> {
try {
if (responded.get()) {
return;
}

// Throw the call exception, which will be handled uniformly by the exception handling program.
if (throwable != null) {
throw throwable;
}

// Throw a business logic exception, which will be handled uniformly
// by the exception handling program.
if (result.getException() != null) {
throw result.getException();
}

// normal response
if (responded.compareAndSet(false, true)) {
response.setStatus(HttpStatus.SC_OK);
httpCodec.writeHttpResponse(response, result);
response.flushBuffer();
}

completionFuture.complete(null);
} catch (Throwable t) {
handleError(t, rpcRequest, response, responded, completionFuture);
}
});

// normal response
response.setStatus(HttpStatus.SC_OK);
httpCodec.writeHttpResponse(response, result);
response.flushBuffer();
} catch (Throwable e) {
HttpServletRequest request = getOriginalRequest(rpcRequest);
logger.warn("reply message error, channel: [{}], msg:[{}]", request.getRemoteAddr(), request, e);
httpErrorReply(request, response,
ErrorResponse.create(request, HttpStatus.SC_SERVICE_UNAVAILABLE, e));
} finally {
countDownLatch.countDown();
}
});
} catch (Exception e) {
handleError(e, rpcRequest, getOriginalResponse(rpcRequest), responded, completionFuture);
}
});
}

/**
* Handle error
*/
private void handleError(Throwable t, DefRequest rpcRequest, HttpServletResponse response,
AtomicBoolean responded, CompletableFuture<Void> completionFuture) {
try {
if (responded.compareAndSet(false, true)) {
HttpServletRequest request = getOriginalRequest(rpcRequest);
logger.warn("reply message error, channel: [{}], msg:[{}]", request.getRemoteAddr(), request, t);
httpErrorReply(request, response, ErrorResponse.create(request, HttpStatus.SC_SERVICE_UNAVAILABLE, t));
}
} finally {
completionFuture.completeExceptionally(t);
}
}

/**
* Build the context request.
*
Expand Down Expand Up @@ -392,7 +429,6 @@ private void setRpcServerContext(HttpServletRequest request, HttpServletResponse
// to maintain consistency.
rpcRequest.getAttachments().put(header, value.getBytes(StandardCharsets.UTF_8));
}
logger.debug("request attachment: {}", JsonUtils.toJson(rpcRequest.getAttachments()));
}

/**
Expand Down Expand Up @@ -488,4 +524,4 @@ private String getString(String[] callInfos, int length, int cursor) {
return callInfos.length < length ? StringUtils.EMPTY : callInfos[cursor];
}

}
}
Loading
Loading