Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.numaproj.numaflow.sourcer.AckRequest;
import io.numaproj.numaflow.sourcer.Message;
import io.numaproj.numaflow.sourcer.NackOffset;
import io.numaproj.numaflow.sourcer.NackRequest;
import io.numaproj.numaflow.sourcer.Offset;
import io.numaproj.numaflow.sourcer.OutputObserver;
Expand Down Expand Up @@ -86,8 +87,8 @@ public void ack(AckRequest request) {
@Override
public void nack(NackRequest request) {
// put them to nacked offsets so that they will be retried immediately.
for (Offset offset : request.getOffsets()) {
Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt();
for (NackOffset offset : request.getOffsets()) {
Integer decoded_offset = ByteBuffer.wrap(offset.getOffset().getValue()).getInt();
yetToBeAcked.remove(decoded_offset);
nacked.put(decoded_offset, true);
readIndex.decrementAndGet();
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/Message.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package io.numaproj.numaflow.batchmapper;

import io.numaproj.numaflow.shared.NackOptions;
import lombok.Getter;

/** Message is used to wrap the data returned by Mapper. */
@Getter
public class Message {
private static final String[] DROP_TAGS = {"U+005C__DROP__"};
private static final String[] NACK_TAGS = {"U+005C__NACK__"};
private final String[] keys;
private final byte[] value;
private final String[] tags;
private final NackOptions nackOptions;

/**
* used to create Message with value, keys and tags(used for conditional forwarding)
Expand All @@ -18,10 +21,15 @@ public class Message {
* @param tags message tags which will be used for conditional forwarding
*/
public Message(byte[] value, String[] keys, String[] tags) {
this(value, keys, tags, (NackOptions) null);
}

private Message(byte[] value, String[] keys, String[] tags, NackOptions nackOptions) {
// defensive copy - once the Message is created, the caller should not be able to modify it.
this.keys = keys == null ? null : keys.clone();
this.value = value == null ? null : value.clone();
this.tags = tags == null ? null : tags.clone();
this.nackOptions = nackOptions;
}

/**
Expand Down Expand Up @@ -51,4 +59,14 @@ public Message(byte[] value, String[] keys) {
public static Message toDrop() {
return new Message(new byte[0], null, DROP_TAGS);
}

/**
* creates a Message that negatively acknowledges the input message, requesting redelivery.
*
* @param nackOptions optional redelivery options (may be null)
* @return the Message which will be nacked
*/
public static Message toNack(NackOptions nackOptions) {
return new Message(new byte[0], null, NACK_TAGS, nackOptions);
}
}
21 changes: 9 additions & 12 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,15 @@ private void buildAndStreamResponse(
responses.getItems().forEach(message -> {
List<MapOuterClass.MapResponse.Result> mapResponseResult = new ArrayList<>();
message.getItems().forEach(res -> {
mapResponseResult.add(
MapOuterClass.MapResponse.Result
.newBuilder()
.setValue(res.getValue()
== null ? ByteString.EMPTY : ByteString.copyFrom(
res.getValue()))
.addAllKeys(res.getKeys()
== null ? new ArrayList<>() : Arrays.asList(res.getKeys()))
.addAllTags(res.getTags()
== null ? new ArrayList<>() : Arrays.asList(res.getTags()))
.build()
);
MapOuterClass.MapResponse.Result.Builder resultBuilder = MapOuterClass.MapResponse.Result
.newBuilder()
.setValue(res.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(res.getValue()))
.addAllKeys(res.getKeys() == null ? new ArrayList<>() : Arrays.asList(res.getKeys()))
.addAllTags(res.getTags() == null ? new ArrayList<>() : Arrays.asList(res.getTags()));
if (res.getNackOptions() != null) {
resultBuilder.setNackOptions(res.getNackOptions().toProto());
}
mapResponseResult.add(resultBuilder.build());
});
MapOuterClass.MapResponse singleRequestResponse = MapOuterClass.MapResponse
.newBuilder()
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/numaproj/numaflow/mapper/MapperActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,19 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String
.newBuilder();

messageList.getMessages().forEach(message -> {
responseBuilder.addResults(MapOuterClass.MapResponse.Result.newBuilder()
MapOuterClass.MapResponse.Result.Builder resultBuilder = MapOuterClass.MapResponse.Result.newBuilder()
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.setMetadata(message.getUserMetadata()
== null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto())
.build());
== null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto());
if (message.getNackOptions() != null) {
resultBuilder.setNackOptions(message.getNackOptions().toProto());
}
responseBuilder.addResults(resultBuilder.build());
});
return responseBuilder.setId(ID).build();
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapper/Message.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.mapper;

import io.numaproj.numaflow.shared.NackOptions;
import io.numaproj.numaflow.shared.UserMetadata;
import lombok.Getter;

Expand All @@ -11,10 +12,12 @@
@Getter
public class Message {
private static final String[] DROP_TAGS = {"U+005C__DROP__"};
private static final String[] NACK_TAGS = {"U+005C__NACK__"};
private final String[] keys;
private final byte[] value;
private final String[] tags;
private final UserMetadata userMetadata;
private final NackOptions nackOptions;

/**
* used to create Message with value, keys, tags(used for conditional forwarding) and userMetadata
Expand All @@ -25,12 +28,17 @@ public class Message {
* @param userMetadata user metadata, this is used to pass user defined metadata to the next vertex
*/
public Message(byte[] value, String[] keys, String[] tags, UserMetadata userMetadata) {
this(value, keys, tags, userMetadata, null);
}

private Message(byte[] value, String[] keys, String[] tags, UserMetadata userMetadata, NackOptions nackOptions) {
// defensive copy - once the Message is created, the caller should not be able to modify it.
this.keys = keys == null ? null : keys.clone();
this.value = value == null ? null : value.clone();
this.tags = tags == null ? null : tags.clone();
// Copy the data using copy constructor to prevent mutation
this.userMetadata = userMetadata == null ? null : new UserMetadata(userMetadata);
this.nackOptions = nackOptions;
}

/**
Expand Down Expand Up @@ -71,4 +79,14 @@ public Message(byte[] value, String[] keys, String[] tags) {
public static Message toDrop() {
return new Message(new byte[0], null, DROP_TAGS, null);
}

/**
* creates a Message that negatively acknowledges the input message, requesting redelivery.
*
* @param nackOptions optional redelivery options (may be null)
* @return the Message which will be nacked
*/
public static Message toNack(NackOptions nackOptions) {
return new Message(new byte[0], null, NACK_TAGS, null, nackOptions);
}
}
18 changes: 18 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Message.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package io.numaproj.numaflow.mapstreamer;

import io.numaproj.numaflow.shared.NackOptions;
import lombok.Getter;

/** Message is used to wrap the data returned by MapStreamer. */
@Getter
public class Message {
private static final String[] DROP_TAGS = {"U+005C__DROP__"};
private static final String[] NACK_TAGS = {"U+005C__NACK__"};
private final String[] keys;
private final byte[] value;
private final String[] tags;
private final NackOptions nackOptions;

/**
* used to create Message with value, keys and tags(used for conditional forwarding)
Expand All @@ -18,10 +21,15 @@ public class Message {
* @param tags message tags which will be used for conditional forwarding
*/
public Message(byte[] value, String[] keys, String[] tags) {
this(value, keys, tags, (NackOptions) null);
}

private Message(byte[] value, String[] keys, String[] tags, NackOptions nackOptions) {
// defensive copy - once the Message is created, the caller should not be able to modify it.
this.keys = keys == null ? null : keys.clone();
this.value = value == null ? null : value.clone();
this.tags = tags == null ? null : tags.clone();
this.nackOptions = nackOptions;
}

/**
Expand Down Expand Up @@ -51,4 +59,14 @@ public Message(byte[] value, String[] keys) {
public static Message toDrop() {
return new Message(new byte[0], null, DROP_TAGS);
}

/**
* creates a Message that negatively acknowledges the input message, requesting redelivery.
*
* @param nackOptions optional redelivery options (may be null)
* @return the Message which will be nacked
*/
public static Message toNack(NackOptions nackOptions) {
return new Message(new byte[0], null, NACK_TAGS, nackOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ public void send(Message message) {
if (message == null) {
return;
}
MapOuterClass.MapResponse.Result.Builder resultBuilder = MapOuterClass.MapResponse.Result.newBuilder()
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()));
if (message.getNackOptions() != null) {
resultBuilder.setNackOptions(message.getNackOptions().toProto());
}
MapOuterClass.MapResponse response = MapOuterClass.MapResponse.newBuilder()
.setId(requestID)
.addResults(MapOuterClass.MapResponse.Result.newBuilder()
.setValue(
message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.build()).build();
.addResults(resultBuilder.build())
.build();
supervisorActor.tell(response, ActorRef.noSender());
}

Expand Down
53 changes: 53 additions & 0 deletions src/main/java/io/numaproj/numaflow/shared/NackOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.numaproj.numaflow.shared;

import lombok.Builder;
import lombok.Getter;

/**
* NackOptions carries per-message redelivery options for a negative acknowledgement (nack).
* All fields are optional; a null value means unset.
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class NackOptions {
/** redelivery delay in milliseconds. */
private final Long delay;
/** maximum number of redelivery attempts. */
private final Integer maxDeliveries;
/** human-readable reason for the nack. */
private final String reason;

/** Converts to the outgoing proto type, setting only the fields that are present. */
public common.NackOptionsOuterClass.NackOptions toProto() {
common.NackOptionsOuterClass.NackOptions.Builder b =
common.NackOptionsOuterClass.NackOptions.newBuilder();
if (delay != null) {
b.setDelay(delay);
}
if (maxDeliveries != null) {
b.setMaxDeliveries(maxDeliveries);
}
if (reason != null) {
b.setReason(reason);
}
return b.build();
}

/** Converts from the incoming proto type. Returns null for null input. */
public static NackOptions fromProto(common.NackOptionsOuterClass.NackOptions p) {
if (p == null) {
return null;
}
NackOptionsBuilder b = NackOptions.newBuilder();
if (p.hasDelay()) {
b.delay(p.getDelay());
}
if (p.hasMaxDeliveries()) {
b.maxDeliveries(p.getMaxDeliveries());
}
if (p.hasReason()) {
b.reason(p.getReason());
}
return b.build();
}
}
25 changes: 20 additions & 5 deletions src/main/java/io/numaproj/numaflow/sinker/Response.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.sinker;

import io.numaproj.numaflow.shared.NackOptions;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -20,6 +21,8 @@ public class Response {
private final byte[] serveResponse;
private final Boolean onSuccess;
private final Message onSuccessMessage;
private final Boolean nack;
private final NackOptions nackOptions;

/**
* Static method to create response for successful message processing.
Expand All @@ -28,7 +31,7 @@ public class Response {
* @return Response object with success status
*/
public static Response responseOK(String id) {
return new Response(id, true, null, false, false, null, false, null);
return new Response(id, true, null, false, false, null, false, null, false, null);
}

/**
Expand All @@ -39,7 +42,7 @@ public static Response responseOK(String id) {
* @return Response object with failure status and error message
*/
public static Response responseFailure(String id, String errMsg) {
return new Response(id, false, errMsg, false, false, null, false, null);
return new Response(id, false, errMsg, false, false, null, false, null, false, null);
}

/**
Expand All @@ -50,7 +53,7 @@ public static Response responseFailure(String id, String errMsg) {
* @return Response object with fallback status
*/
public static Response responseFallback(String id) {
return new Response(id, false, null, true, false, null, false, null);
return new Response(id, false, null, true, false, null, false, null, false, null);
}

/**
Expand All @@ -63,7 +66,7 @@ public static Response responseFallback(String id) {
* @return Response object with serve status and serve response
*/
public static Response responseServe(String id, byte[] serveResponse) {
return new Response(id, false, null, false, true, serveResponse, false, null);
return new Response(id, false, null, false, true, serveResponse, false, null, false, null);
}

/**
Expand All @@ -76,6 +79,18 @@ public static Response responseServe(String id, byte[] serveResponse) {
* @return Response object with onSuccess status and onSuccess message
*/
public static Response responseOnSuccess(String id, Message onSuccessMessage) {
return new Response(id, false, null, false, false, null, true, onSuccessMessage);
return new Response(id, false, null, false, false, null, true, onSuccessMessage, false, null);
}

/**
* Static method to create a nack response, indicating the message should be negatively
* acknowledged and redelivered. nackOptions may be null.
*
* @param id id of the message
* @param nackOptions optional redelivery options
* @return Response object with nack status
*/
public static Response responseNack(String id, NackOptions nackOptions) {
return new Response(id, false, null, false, false, null, false, null, true, nackOptions);
}
}
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ private SinkOuterClass.SinkResponse.Result buildResult(Response response) {
.setStatus(SinkOuterClass.Status.ON_SUCCESS)
.setOnSuccessMsg(Message.toProto(response.getOnSuccessMessage()))
.build();
} else if (response.getNack() != null && response.getNack()) {
SinkOuterClass.SinkResponse.Result.Builder b = SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setStatus(SinkOuterClass.Status.NACK);
if (response.getNackOptions() != null) {
b.setNackOptions(response.getNackOptions().toProto());
}
return b.build();
} else {
// FIXME: Return error when error message is not set?
return SinkOuterClass.SinkResponse.Result.newBuilder()
Expand Down
Loading
Loading