Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,27 @@ public class ProcessedContent implements Serializable {
Map<String, String> metadata = new HashMap<>();

/**
* the actual content
* the actual content; may be null when the upstream response has no body (e.g. 204, HEAD)
*/
@Getter(lombok.AccessLevel.NONE)
byte[] content;

/**
* Returns content bytes for consumers that need to read or write the body.
* Missing bodies are treated as an empty array to avoid NPEs in output writers.
*/
public byte[] getContent() {
return content != null ? content : new byte[0];
}

/**
* for convenience, a method to get the content as a string - rather than byte array
* @return the content as a string, using the specified contentCharset
* @return the content as a string, using the specified contentCharset; null if no body
*/
public String getContentAsString() {
if (getContent() == null) {
if (content == null) {
return null;
} else {
return new String(getContent(), contentCharset);
}
return new String(content, contentCharset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ public void write(ProcessedContent content) throws WriteFailure {
@Override
public void write(String key, ProcessedContent content) throws WriteFailure {
try {
if (!Objects.equals(COMPRESSION_TYPE, content.getContentEncoding())) {
byte[] rawContent = content.getContent();
if (!Objects.equals(COMPRESSION_TYPE, content.getContentEncoding())) {
Comment on lines 35 to +37
log.info("Compressing response with gzip encoding through wrapper");
byte[] compressedContent = gzipContent(content.getContent());
byte[] compressedContent = gzipContent(rawContent);
content = content.withContentEncoding(COMPRESSION_TYPE).withContent(compressedContent);
}
delegate.write(key, content);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package co.worklytics.psoxy.gateway.impl.output;

import co.worklytics.psoxy.gateway.ProcessedContent;
import co.worklytics.psoxy.gateway.output.Output;

import org.junit.jupiter.api.Test;

import java.io.BufferedReader;
Expand All @@ -13,6 +16,25 @@

class CompressedOutputWrapperTest {

@Test
void writeNullContent() throws Exception {
Output delegate = new NoOutput();
CompressedOutputWrapper wrapper = CompressedOutputWrapper.wrap(delegate);
ProcessedContent content = ProcessedContent.builder().content(null).build();
assertDoesNotThrow(() -> wrapper.write(content));
}

@Test
void writeNullContentAlreadyGzipEncoded() throws Exception {
Output delegate = new NoOutput();
CompressedOutputWrapper wrapper = CompressedOutputWrapper.wrap(delegate);
ProcessedContent content = ProcessedContent.builder()
.content(null)
.contentEncoding(CompressedOutputWrapper.COMPRESSION_TYPE)
.build();
assertDoesNotThrow(() -> wrapper.write(content));
}

@Test
void gzipContent() throws Exception {
String content = "Hello, world!";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ public S3Output(@Assisted OutputLocation location) {

@Override
public void write(String key, ProcessedContent content) throws WriteFailure {
byte[] body = content.getContent();

if (key == null) {
key = DigestUtils.md5Hex(content.getContent());
key = DigestUtils.md5Hex(body);
}

try {
Expand All @@ -66,14 +67,14 @@ public void write(String key, ProcessedContent content) throws WriteFailure {
PutObjectRequest.Builder putBuilder = PutObjectRequest.builder()
.bucket(bucket)
.key(pathPrefix + key)
.contentLength((long) content.getContent().length)
.contentLength((long) body.length)
.metadata(userMetadata);

// s3 client blows up if these are filled with 'null' values, so only set if present
Optional.ofNullable(content.getContentEncoding()).ifPresent(putBuilder::contentEncoding);
Optional.ofNullable(content.getContentType()).ifPresent(putBuilder::contentType);

s3Client.putObject(putBuilder.build(), RequestBody.fromBytes(content.getContent()));
s3Client.putObject(putBuilder.build(), RequestBody.fromBytes(body));
} catch (Exception e) {
throw new WriteFailure("Failed to write to S3 output", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ public GCSOutput(@Assisted OutputLocation location) {

@Override
public void write(String key, ProcessedContent content) throws WriteFailure {
byte[] body = content.getContent();

if (key == null) {
key = DigestUtils.md5Hex(content.getContent());
key = DigestUtils.md5Hex(body);
}

try {
Expand All @@ -58,7 +60,9 @@ public void write(String key, ProcessedContent content) throws WriteFailure {
.setContentEncoding(content.getContentEncoding())
.setMetadata(metadata)
.build())) {
writeChannel.write(java.nio.ByteBuffer.wrap(content.getContent(), 0, content.getContent().length));
if (body.length > 0) {
writeChannel.write(java.nio.ByteBuffer.wrap(body));
}
}
} catch (Exception e) {
log.log(Level.WARNING, "Failed to write to GCS sideOutput", e);
Expand Down
Loading