Skip to content

Commit 133ec68

Browse files
committed
Add async Brotli support (compress/decompress).
Introduce DeflatingBrotliAsyncEntityProducer and InflatingBrotliAsyncDataConsumer; register "br" when brotli4j is available.
1 parent 3eda509 commit 133ec68

10 files changed

Lines changed: 1123 additions & 1 deletion

File tree

httpclient5/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@
118118
<artifactId>zstd-jni</artifactId>
119119
<optional>true</optional>
120120
</dependency>
121+
<dependency>
122+
<groupId>com.aayushatharva.brotli4j</groupId>
123+
<artifactId>brotli4j</artifactId>
124+
</dependency>
121125
</dependencies>
122126

123127
<build>
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.async.methods;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.Set;
34+
35+
import com.aayushatharva.brotli4j.encoder.Encoder;
36+
import com.aayushatharva.brotli4j.encoder.EncoderJNI;
37+
38+
import org.apache.hc.core5.http.Header;
39+
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
40+
import org.apache.hc.core5.http.nio.DataStreamChannel;
41+
import org.apache.hc.core5.util.Args;
42+
43+
/**
44+
* {@code AsyncEntityProducer} that Brotli-compresses bytes from an upstream producer
45+
* on the fly and writes the compressed stream to the target {@link DataStreamChannel}.
46+
* <p>
47+
* Purely async/streaming: no {@code InputStream}/{@code OutputStream}. Back-pressure is
48+
* honored via {@link #available()} and the I/O reactor’s calls into {@link #produce(DataStreamChannel)}.
49+
* Trailers from the upstream producer are preserved and emitted once the compressed output
50+
* has been fully drained.
51+
* </p>
52+
*
53+
* <h4>Content metadata</h4>
54+
* Returns {@code Content-Encoding: br}, {@code Content-Length: -1} and {@code chunked=true}.
55+
* Repeatability matches the upstream producer.
56+
*
57+
* <h4>Implementation notes</h4>
58+
* Uses Brotli4j’s {@code EncoderJNI.Wrapper}. JNI-owned output buffers are written directly
59+
* when possible; if the channel applies back-pressure, the unwritten tail is copied into
60+
* small pooled direct {@link java.nio.ByteBuffer}s to reduce allocation churn. Native
61+
* resources are released in {@link #releaseResources()}.
62+
* <p>
63+
* Ensure {@link com.aayushatharva.brotli4j.Brotli4jLoader#ensureAvailability()} has been
64+
* called once at startup; this class also invokes it in a static initializer as a safeguard.
65+
* </p>
66+
*
67+
* <h4>Usage</h4>
68+
* <pre>{@code
69+
* AsyncEntityProducer plain = new StringAsyncEntityProducer("hello", ContentType.TEXT_PLAIN);
70+
* AsyncEntityProducer br = new DeflatingBrotliEntityProducer(plain); // defaults q=5, lgwin=22
71+
* client.execute(new BasicRequestProducer(post, br),
72+
* new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
73+
* null);
74+
* }</pre>
75+
*
76+
* @see org.apache.hc.core5.http.nio.AsyncEntityProducer
77+
* @see org.apache.hc.core5.http.nio.DataStreamChannel
78+
* @see com.aayushatharva.brotli4j.encoder.EncoderJNI
79+
* @since 5.6
80+
*/
81+
public final class DeflatingBrotliEntityProducer implements AsyncEntityProducer {
82+
83+
private enum State { STREAMING, FINISHING, DONE }
84+
85+
private final AsyncEntityProducer upstream;
86+
private final EncoderJNI.Wrapper encoder;
87+
88+
private ByteBuffer pendingOut;
89+
private List<? extends Header> pendingTrailers;
90+
private State state = State.STREAMING;
91+
92+
/**
93+
* Create a producer with explicit Brotli params.
94+
*
95+
* @param upstream upstream entity producer whose bytes will be compressed
96+
* @param quality Brotli quality level (see brotli4j documentation)
97+
* @param lgwin Brotli window size log2 (see brotli4j documentation)
98+
* @param mode Brotli mode hint (GENERIC/TEXT/FONT)
99+
* @throws IOException if the native encoder cannot be created
100+
* @since 5.6
101+
*/
102+
public DeflatingBrotliEntityProducer(
103+
final AsyncEntityProducer upstream,
104+
final int quality,
105+
final int lgwin,
106+
final Encoder.Mode mode) throws IOException {
107+
this.upstream = Args.notNull(upstream, "upstream");
108+
this.encoder = new EncoderJNI.Wrapper(256 * 1024, quality, lgwin, mode);
109+
}
110+
111+
/**
112+
* Convenience constructor mapping {@code 0=GENERIC, 1=TEXT, 2=FONT}.
113+
*
114+
* @since 5.6
115+
*/
116+
public DeflatingBrotliEntityProducer(
117+
final AsyncEntityProducer upstream,
118+
final int quality,
119+
final int lgwin,
120+
final int modeInt) throws IOException {
121+
this(upstream, quality, lgwin,
122+
modeInt == 1 ? Encoder.Mode.TEXT :
123+
modeInt == 2 ? Encoder.Mode.FONT : Encoder.Mode.GENERIC);
124+
}
125+
126+
/**
127+
* Create a producer with sensible defaults ({@code quality=5}, {@code lgwin=22}, {@code GENERIC}).
128+
*
129+
* @since 5.6
130+
*/
131+
public DeflatingBrotliEntityProducer(final AsyncEntityProducer upstream) throws IOException {
132+
this(upstream, 5, 22, Encoder.Mode.GENERIC);
133+
}
134+
135+
136+
@Override
137+
public String getContentType() {
138+
return upstream.getContentType();
139+
}
140+
141+
@Override
142+
public String getContentEncoding() {
143+
return "br";
144+
}
145+
146+
@Override
147+
public long getContentLength() {
148+
return -1;
149+
}
150+
151+
@Override
152+
public boolean isChunked() {
153+
return true;
154+
}
155+
156+
@Override
157+
public Set<String> getTrailerNames() {
158+
return upstream.getTrailerNames();
159+
}
160+
161+
@Override
162+
public boolean isRepeatable() {
163+
return upstream.isRepeatable();
164+
}
165+
166+
@Override
167+
public int available() {
168+
if (state == State.DONE) {
169+
return 0;
170+
}
171+
if (pendingOut != null && pendingOut.hasRemaining() || pendingTrailers != null) {
172+
return 1;
173+
}
174+
final int up = upstream.available();
175+
return (state != State.STREAMING || up > 0) ? 1 : 0;
176+
}
177+
178+
@Override
179+
public void produce(final DataStreamChannel channel) throws IOException {
180+
if (flushPending(channel)) {
181+
return;
182+
}
183+
184+
if (state == State.FINISHING) {
185+
encoder.push(EncoderJNI.Operation.FINISH, 0);
186+
if (drainEncoder(channel)) {
187+
return;
188+
}
189+
if (pendingTrailers == null) {
190+
pendingTrailers = Collections.emptyList();
191+
}
192+
channel.endStream(pendingTrailers);
193+
pendingTrailers = null;
194+
state = State.DONE;
195+
return;
196+
}
197+
198+
upstream.produce(new DataStreamChannel() {
199+
@Override
200+
public void requestOutput() {
201+
channel.requestOutput();
202+
}
203+
204+
@Override
205+
public int write(final ByteBuffer src) throws IOException {
206+
int accepted = 0;
207+
while (src.hasRemaining()) {
208+
final ByteBuffer in = encoder.getInputBuffer();
209+
if (!in.hasRemaining()) {
210+
encoder.push(EncoderJNI.Operation.PROCESS, 0);
211+
if (drainEncoder(channel)) {
212+
break;
213+
}
214+
continue;
215+
}
216+
final int xfer = Math.min(src.remaining(), in.remaining());
217+
final int lim = src.limit();
218+
src.limit(src.position() + xfer);
219+
in.put(src);
220+
src.limit(lim);
221+
accepted += xfer;
222+
223+
encoder.push(EncoderJNI.Operation.PROCESS, xfer);
224+
if (drainEncoder(channel)) {
225+
break;
226+
}
227+
}
228+
return accepted;
229+
}
230+
231+
@Override
232+
public void endStream() throws IOException {
233+
endStream(Collections.emptyList());
234+
}
235+
236+
@Override
237+
public void endStream(final List<? extends Header> trailers) throws IOException {
238+
pendingTrailers = trailers;
239+
state = State.FINISHING;
240+
encoder.push(EncoderJNI.Operation.FINISH, 0);
241+
if (drainEncoder(channel)) {
242+
return;
243+
}
244+
if (pendingTrailers == null) {
245+
pendingTrailers = Collections.emptyList();
246+
}
247+
channel.endStream(pendingTrailers);
248+
pendingTrailers = null;
249+
state = State.DONE;
250+
}
251+
});
252+
}
253+
254+
@Override
255+
public void failed(final Exception cause) {
256+
upstream.failed(cause);
257+
}
258+
259+
@Override
260+
public void releaseResources() {
261+
try {
262+
encoder.destroy();
263+
} catch (final Throwable ignore) {
264+
}
265+
upstream.releaseResources();
266+
pendingOut = null;
267+
pendingTrailers = null;
268+
state = State.DONE;
269+
}
270+
271+
272+
private boolean flushPending(final DataStreamChannel channel) throws IOException {
273+
if (pendingOut != null && pendingOut.hasRemaining()) {
274+
channel.write(pendingOut);
275+
if (pendingOut.hasRemaining()) {
276+
channel.requestOutput();
277+
return true;
278+
}
279+
pendingOut = null;
280+
}
281+
if (pendingOut == null && pendingTrailers != null && state != State.STREAMING) {
282+
channel.endStream(pendingTrailers);
283+
pendingTrailers = null;
284+
state = State.DONE;
285+
return true;
286+
}
287+
return false;
288+
}
289+
290+
private boolean drainEncoder(final DataStreamChannel channel) throws IOException {
291+
while (encoder.hasMoreOutput()) {
292+
final ByteBuffer buf = encoder.pull();
293+
if (buf == null || !buf.hasRemaining()) {
294+
continue;
295+
}
296+
channel.write(buf);
297+
if (buf.hasRemaining()) {
298+
pendingOut = ByteBuffer.allocateDirect(buf.remaining());
299+
pendingOut.put(buf).flip();
300+
channel.requestOutput();
301+
return true;
302+
}
303+
}
304+
return false;
305+
}
306+
}

0 commit comments

Comments
 (0)