Skip to content

refactor: implement zero-copy for Triple client#16072

Open
EarthChen wants to merge 6 commits intoapache:3.3from
EarthChen:refactor-tri-client-mesasge-stream
Open

refactor: implement zero-copy for Triple client#16072
EarthChen wants to merge 6 commits intoapache:3.3from
EarthChen:refactor-tri-client-mesasge-stream

Conversation

@EarthChen
Copy link
Member

What is the purpose of the change?

Checklist

  • Make sure there is a GitHub_issue field for the change.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test to verify your logic correction. If the new feature or significant change is committed, please remember to add sample in dubbo samples project.
  • Make sure gitHub actions can pass. Why the workflow is failing and how to fix it?

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request refactors the Triple client to implement zero-copy compression, eliminating the need to buffer entire messages in memory before compression. The change moves from a byte-array-based compression model to a streaming model where data is compressed directly into Netty ByteBuf instances.

Changes:

  • Refactored compression pipeline to use InputStream/OutputStream decorators instead of byte arrays
  • Updated Compressor and DeCompressor interfaces to support streaming with new decorate() and decompress(InputStream) methods
  • Modified ClientStream API to accept InputStream instead of byte arrays for message sending
  • Implemented streaming decompression in GrpcStreamingDecoder to handle compressed messages without buffering

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
Compressor.java Added streaming compression via decorate() method, deprecated byte[] compress()
DeCompressor.java Added streaming decompression via decompress(InputStream), deprecated byte[] decompress()
Identity.java Implemented pass-through streaming methods for identity (no compression)
Gzip.java Implemented streaming decompression using GZIPInputStream
Snappy.java Implemented streaming compression/decompression using Snappy streams
Bzip2.java Implemented streaming decompression using BZip2CompressorInputStream
ClientStream.java Changed sendMessage API from byte[]/int to InputStream/Compressor
AbstractTripleClientStream.java Updated to use InputStream and estimate message size via available()
DataQueueCommand.java Refactored to compress data directly into ByteBuf using streaming API
GrpcStreamingDecoder.java Updated to decompress messages on-the-fly without buffering
TripleClientCall.java Updated to create ByteArrayInputStream from serialized data
Stream.java Updated documentation for messageLength to indicate -1 for unknown compressed size
StreamingDecoder.java Updated documentation for messageLength parameter
LengthFieldStreamingDecoder.java Changed BoundedInputStream visibility to protected for reuse
WriteQueueTest.java Updated test to use new DataQueueCommand API
TripleClientStreamTest.java Updated test to use new sendMessage API

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 86 to 91
try {
// Compress and write data directly into ByteBuf using decorator pattern
ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
OutputStream compressedOut = compressor.decorate(bbos);
StreamUtils.copy(dataStream, compressedOut);
compressedOut.close();
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dataStream passed to this method is not closed after being read. This can lead to resource leaks, especially for file-based InputStreams or other InputStreams that hold system resources. Consider wrapping the stream operations in a try-with-resources block or explicitly closing the dataStream in the finally block after copying is complete.

Copilot uses AI. Check for mistakes.
Comment on lines 95 to 99
} catch (Exception e) {
buf.release();
promise.setFailure(e);
return;
}
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an exception occurs during compression/copying, the ByteBuf is released but the dataStream is not closed. This can lead to resource leaks. The dataStream should also be closed in the catch block to ensure proper cleanup of any system resources it may hold.

Copilot uses AI. Check for mistakes.
Comment on lines 77 to 85
/**
* Send message to remote peer.
* Send message to remote peer with zero-copy compression.
* The compressor decorates the output stream to compress data directly into ByteBuf.
*
* @param message message to send to remote peer
* @return future to callback when send message is done
* @param message raw message stream (uncompressed)
* @param compressor compressor to use for compression
* @return future to callback when send is done
*/
Future<?> sendMessage(byte[] message, int compressFlag);
Future<?> sendMessage(InputStream message, Compressor compressor);
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sendMessage method signature has been changed from sendMessage(byte[], int) to sendMessage(InputStream, Compressor), which is a breaking API change. If ClientStream is considered a public or stable API, the old method should be deprecated first rather than removed directly, or this should be clearly documented as a breaking change in the PR description. If this is an internal-only API, consider adding package-private visibility or an @internal annotation to make this clear.

Copilot uses AI. Check for mistakes.
Comment on lines 88 to 91
ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
OutputStream compressedOut = compressor.decorate(bbos);
StreamUtils.copy(dataStream, compressedOut);
compressedOut.close();
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ByteBufOutputStream is not always closed on method exit.

Suggested change
ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
OutputStream compressedOut = compressor.decorate(bbos);
StreamUtils.copy(dataStream, compressedOut);
compressedOut.close();
try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
OutputStream compressedOut = compressor.decorate(bbos)) {
StreamUtils.copy(dataStream, compressedOut);
}

Copilot uses AI. Check for mistakes.
@codecov-commenter
Copy link

codecov-commenter commented Feb 5, 2026

Codecov Report

❌ Patch coverage is 70.83333% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.75%. Comparing base (678d2a7) to head (d5754b2).

Files with missing lines Patch % Lines
...bbo/rpc/protocol/tri/command/DataQueueCommand.java 65.21% 6 Missing and 2 partials ⚠️
.../dubbo/rpc/protocol/tri/call/TripleClientCall.java 77.77% 1 Missing and 1 partial ⚠️
...ache/dubbo/rpc/protocol/tri/compressor/Snappy.java 0.00% 2 Missing ⚠️
...pache/dubbo/rpc/protocol/tri/compressor/Bzip2.java 0.00% 1 Missing ⚠️
...he/dubbo/rpc/protocol/tri/compressor/Identity.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##                3.3   #16072      +/-   ##
============================================
+ Coverage     60.74%   60.75%   +0.01%     
- Complexity    11751    11757       +6     
============================================
  Files          1953     1953              
  Lines         89120    89148      +28     
  Branches      13443    13444       +1     
============================================
+ Hits          54132    54158      +26     
- Misses        29420    29422       +2     
  Partials       5568     5568              
Flag Coverage Δ
integration-tests-java21 32.15% <58.33%> (+0.03%) ⬆️
integration-tests-java8 32.28% <58.33%> (+0.02%) ⬆️
samples-tests-java21 32.14% <68.75%> (-0.03%) ⬇️
samples-tests-java8 29.73% <68.75%> (+0.01%) ⬆️
unit-tests-java11 59.00% <60.41%> (+<0.01%) ⬆️
unit-tests-java17 58.48% <60.41%> (+<0.01%) ⬆️
unit-tests-java21 58.50% <60.41%> (+0.02%) ⬆️
unit-tests-java25 58.46% <60.41%> (-0.01%) ⬇️
unit-tests-java8 59.00% <60.41%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@EarthChen EarthChen changed the title refactor: implement zero-copy compression for Triple client refactor: implement zero-copy for Triple client Feb 5, 2026
@EarthChen EarthChen requested a review from Copilot February 5, 2026 09:42
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

public OutputStream decorate(OutputStream outputStream) {
return outputStream;
// Snappy wraps output with SnappyOutputStream for streaming compression
return new org.xerial.snappy.SnappyOutputStream(outputStream);
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SnappyOutputStream constructor can throw IOException, but this is not being caught. While Gzip and Bzip2 properly wrap IOExceptions from their stream constructors in IllegalStateException (see lines 70-74 in Gzip.java and lines 76-80 in Bzip2.java), the Snappy implementation doesn't follow this pattern. This will cause a compilation error. Wrap the construction in a try-catch block and throw IllegalStateException, consistent with the other compressor implementations.

Suggested change
return new org.xerial.snappy.SnappyOutputStream(outputStream);
try {
return new org.xerial.snappy.SnappyOutputStream(outputStream);
} catch (IOException e) {
throw new IllegalStateException(e);
}

Copilot uses AI. Check for mistakes.
Comment on lines +70 to +73
@Override
public InputStream decompress(InputStream inputStream) throws IOException {
return new org.xerial.snappy.SnappyInputStream(inputStream);
}
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new streaming compression methods (decorate and decompress(InputStream)) are not directly unit tested. While the existing compressor tests exercise the deprecated byte[] methods and the integration tests (WriteQueueTest, TripleClientStreamTest) exercise the new paths, consider adding explicit unit tests for the streaming methods to ensure they properly handle edge cases like empty streams, IOException propagation, and proper resource cleanup. This is especially important since the deprecated methods will eventually be removed.

Copilot uses AI. Check for mistakes.
Comment on lines +94 to +96
public UnsafeByteArrayInputStream toInputStream() {
return new UnsafeByteArrayInputStream(mBuffer, 0, mCount);
}
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new toInputStream() method in UnsafeByteArrayOutputStream is not tested. While integration tests may exercise this path, consider adding a unit test to verify that the returned InputStream correctly wraps the internal buffer without copying, properly reflects the current size (mCount), and can be read from the correct position.

Copilot uses AI. Check for mistakes.
Comment on lines +125 to +128
public void writeTo(OutputStream out) throws IOException {
out.write(mData, mPosition, mLimit - mPosition);
mPosition = mLimit;
}
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new writeTo(OutputStream) method in UnsafeByteArrayInputStream is not tested. While integration tests may exercise this path through StreamUtils.copy, consider adding a unit test to verify that: 1) data is written correctly from mPosition to mLimit, 2) mPosition is updated to mLimit after the write, and 3) the zero-copy behavior works correctly (writing directly from the internal buffer).

Copilot uses AI. Check for mistakes.
EarthChen and others added 3 commits February 5, 2026 17:51
…dubbo/remoting/http12/message/StreamingDecoder.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/call/ClientCall.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/stream/Stream.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@LI123456mo
Copy link
Contributor

HI @EarthChen have seen you have used Stream(Or is it DataStream?). In my previous PR , I tried to use Bytebuf , netty concept, can you tell me why this was failing to achieve zero copy?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants