refactor: implement zero-copy for Triple client#16072
refactor: implement zero-copy for Triple client#16072EarthChen wants to merge 6 commits intoapache:3.3from
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request refactors the Triple client to implement zero-copy compression, eliminating the need to buffer entire messages in memory before compression. The change moves from a byte-array-based compression model to a streaming model where data is compressed directly into Netty ByteBuf instances.
Changes:
- Refactored compression pipeline to use InputStream/OutputStream decorators instead of byte arrays
- Updated Compressor and DeCompressor interfaces to support streaming with new
decorate()anddecompress(InputStream)methods - Modified ClientStream API to accept InputStream instead of byte arrays for message sending
- Implemented streaming decompression in GrpcStreamingDecoder to handle compressed messages without buffering
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| Compressor.java | Added streaming compression via decorate() method, deprecated byte[] compress() |
| DeCompressor.java | Added streaming decompression via decompress(InputStream), deprecated byte[] decompress() |
| Identity.java | Implemented pass-through streaming methods for identity (no compression) |
| Gzip.java | Implemented streaming decompression using GZIPInputStream |
| Snappy.java | Implemented streaming compression/decompression using Snappy streams |
| Bzip2.java | Implemented streaming decompression using BZip2CompressorInputStream |
| ClientStream.java | Changed sendMessage API from byte[]/int to InputStream/Compressor |
| AbstractTripleClientStream.java | Updated to use InputStream and estimate message size via available() |
| DataQueueCommand.java | Refactored to compress data directly into ByteBuf using streaming API |
| GrpcStreamingDecoder.java | Updated to decompress messages on-the-fly without buffering |
| TripleClientCall.java | Updated to create ByteArrayInputStream from serialized data |
| Stream.java | Updated documentation for messageLength to indicate -1 for unknown compressed size |
| StreamingDecoder.java | Updated documentation for messageLength parameter |
| LengthFieldStreamingDecoder.java | Changed BoundedInputStream visibility to protected for reuse |
| WriteQueueTest.java | Updated test to use new DataQueueCommand API |
| TripleClientStreamTest.java | Updated test to use new sendMessage API |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try { | ||
| // Compress and write data directly into ByteBuf using decorator pattern | ||
| ByteBufOutputStream bbos = new ByteBufOutputStream(buf); | ||
| OutputStream compressedOut = compressor.decorate(bbos); | ||
| StreamUtils.copy(dataStream, compressedOut); | ||
| compressedOut.close(); |
There was a problem hiding this comment.
The dataStream passed to this method is not closed after being read. This can lead to resource leaks, especially for file-based InputStreams or other InputStreams that hold system resources. Consider wrapping the stream operations in a try-with-resources block or explicitly closing the dataStream in the finally block after copying is complete.
...riple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
Outdated
Show resolved
Hide resolved
| } catch (Exception e) { | ||
| buf.release(); | ||
| promise.setFailure(e); | ||
| return; | ||
| } |
There was a problem hiding this comment.
When an exception occurs during compression/copying, the ByteBuf is released but the dataStream is not closed. This can lead to resource leaks. The dataStream should also be closed in the catch block to ensure proper cleanup of any system resources it may hold.
| /** | ||
| * Send message to remote peer. | ||
| * Send message to remote peer with zero-copy compression. | ||
| * The compressor decorates the output stream to compress data directly into ByteBuf. | ||
| * | ||
| * @param message message to send to remote peer | ||
| * @return future to callback when send message is done | ||
| * @param message raw message stream (uncompressed) | ||
| * @param compressor compressor to use for compression | ||
| * @return future to callback when send is done | ||
| */ | ||
| Future<?> sendMessage(byte[] message, int compressFlag); | ||
| Future<?> sendMessage(InputStream message, Compressor compressor); |
There was a problem hiding this comment.
The sendMessage method signature has been changed from sendMessage(byte[], int) to sendMessage(InputStream, Compressor), which is a breaking API change. If ClientStream is considered a public or stable API, the old method should be deprecated first rather than removed directly, or this should be clearly documented as a breaking change in the PR description. If this is an internal-only API, consider adding package-private visibility or an @internal annotation to make this clear.
| ByteBufOutputStream bbos = new ByteBufOutputStream(buf); | ||
| OutputStream compressedOut = compressor.decorate(bbos); | ||
| StreamUtils.copy(dataStream, compressedOut); | ||
| compressedOut.close(); |
There was a problem hiding this comment.
This ByteBufOutputStream is not always closed on method exit.
| ByteBufOutputStream bbos = new ByteBufOutputStream(buf); | |
| OutputStream compressedOut = compressor.decorate(bbos); | |
| StreamUtils.copy(dataStream, compressedOut); | |
| compressedOut.close(); | |
| try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf); | |
| OutputStream compressedOut = compressor.decorate(bbos)) { | |
| StreamUtils.copy(dataStream, compressedOut); | |
| } |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## 3.3 #16072 +/- ##
============================================
+ Coverage 60.74% 60.75% +0.01%
- Complexity 11751 11757 +6
============================================
Files 1953 1953
Lines 89120 89148 +28
Branches 13443 13444 +1
============================================
+ Hits 54132 54158 +26
- Misses 29420 29422 +2
Partials 5568 5568
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public OutputStream decorate(OutputStream outputStream) { | ||
| return outputStream; | ||
| // Snappy wraps output with SnappyOutputStream for streaming compression | ||
| return new org.xerial.snappy.SnappyOutputStream(outputStream); |
There was a problem hiding this comment.
The SnappyOutputStream constructor can throw IOException, but this is not being caught. While Gzip and Bzip2 properly wrap IOExceptions from their stream constructors in IllegalStateException (see lines 70-74 in Gzip.java and lines 76-80 in Bzip2.java), the Snappy implementation doesn't follow this pattern. This will cause a compilation error. Wrap the construction in a try-catch block and throw IllegalStateException, consistent with the other compressor implementations.
| return new org.xerial.snappy.SnappyOutputStream(outputStream); | |
| try { | |
| return new org.xerial.snappy.SnappyOutputStream(outputStream); | |
| } catch (IOException e) { | |
| throw new IllegalStateException(e); | |
| } |
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/Stream.java
Outdated
Show resolved
Hide resolved
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
Outdated
Show resolved
Hide resolved
...remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java
Outdated
Show resolved
Hide resolved
| @Override | ||
| public InputStream decompress(InputStream inputStream) throws IOException { | ||
| return new org.xerial.snappy.SnappyInputStream(inputStream); | ||
| } |
There was a problem hiding this comment.
The new streaming compression methods (decorate and decompress(InputStream)) are not directly unit tested. While the existing compressor tests exercise the deprecated byte[] methods and the integration tests (WriteQueueTest, TripleClientStreamTest) exercise the new paths, consider adding explicit unit tests for the streaming methods to ensure they properly handle edge cases like empty streams, IOException propagation, and proper resource cleanup. This is especially important since the deprecated methods will eventually be removed.
| public UnsafeByteArrayInputStream toInputStream() { | ||
| return new UnsafeByteArrayInputStream(mBuffer, 0, mCount); | ||
| } |
There was a problem hiding this comment.
The new toInputStream() method in UnsafeByteArrayOutputStream is not tested. While integration tests may exercise this path, consider adding a unit test to verify that the returned InputStream correctly wraps the internal buffer without copying, properly reflects the current size (mCount), and can be read from the correct position.
| public void writeTo(OutputStream out) throws IOException { | ||
| out.write(mData, mPosition, mLimit - mPosition); | ||
| mPosition = mLimit; | ||
| } |
There was a problem hiding this comment.
The new writeTo(OutputStream) method in UnsafeByteArrayInputStream is not tested. While integration tests may exercise this path through StreamUtils.copy, consider adding a unit test to verify that: 1) data is written correctly from mPosition to mLimit, 2) mPosition is updated to mLimit after the write, and 3) the zero-copy behavior works correctly (writing directly from the internal buffer).
…dubbo/remoting/http12/message/StreamingDecoder.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/call/ClientCall.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/stream/Stream.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
HI @EarthChen have seen you have used Stream(Or is it DataStream?). In my previous PR , I tried to use Bytebuf , netty concept, can you tell me why this was failing to achieve zero copy? |
What is the purpose of the change?
Checklist