From 2febb4063d2fd3843295ed72f352f2a813d22d74 Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Wed, 1 Jul 2026 08:41:35 -0700 Subject: [PATCH 1/4] feat: end to end wiring for per message nack Signed-off-by: Abdullah Yildirim --- packages/pynumaflow/pynumaflow/_constants.py | 1 + packages/pynumaflow/pynumaflow/_nack.py | 39 ++++++++ .../pynumaflow/batchmapper/__init__.py | 2 + .../pynumaflow/batchmapper/_dtypes.py | 15 ++- .../batchmapper/servicer/async_servicer.py | 3 +- .../pynumaflow/pynumaflow/mapper/__init__.py | 2 + .../pynumaflow/pynumaflow/mapper/_dtypes.py | 15 ++- .../mapper/_servicer/_async_servicer.py | 2 + .../mapper/_servicer/_sync_servicer.py | 2 + .../pynumaflow/mapstreamer/__init__.py | 2 + .../pynumaflow/mapstreamer/_dtypes.py | 15 ++- .../mapstreamer/servicer/async_servicer.py | 3 +- .../proto/common/nack_options.proto | 13 +++ .../proto/common/nack_options_pb2.py | 36 +++++++ .../proto/common/nack_options_pb2.pyi | 15 +++ .../proto/common/nack_options_pb2_grpc.py | 24 +++++ .../pynumaflow/proto/mapper/map.proto | 2 + .../pynumaflow/proto/mapper/map_pb2.py | 39 ++++---- .../pynumaflow/proto/mapper/map_pb2.pyi | 7 +- .../pynumaflow/proto/sinker/sink.proto | 3 + .../pynumaflow/proto/sinker/sink_pb2.py | 47 ++++----- .../pynumaflow/proto/sinker/sink_pb2.pyi | 9 +- .../pynumaflow/proto/sourcer/source.proto | 2 + .../pynumaflow/proto/sourcer/source_pb2.py | 99 ++++++++++--------- .../pynumaflow/proto/sourcer/source_pb2.pyi | 7 +- .../proto/sourcetransformer/transform.proto | 2 + .../proto/sourcetransformer/transform_pb2.py | 35 +++---- .../proto/sourcetransformer/transform_pb2.pyi | 7 +- .../pynumaflow/pynumaflow/sinker/__init__.py | 2 + .../pynumaflow/pynumaflow/sinker/_dtypes.py | 28 +++++- .../pynumaflow/sinker/servicer/utils.py | 7 ++ .../pynumaflow/pynumaflow/sourcer/__init__.py | 2 + .../pynumaflow/pynumaflow/sourcer/_dtypes.py | 16 ++- .../sourcer/servicer/async_servicer.py | 8 +- .../pynumaflow/sourcetransformer/__init__.py | 2 + .../pynumaflow/sourcetransformer/_dtypes.py | 19 +++- .../servicer/_async_servicer.py | 2 + .../sourcetransformer/servicer/_servicer.py | 2 + 38 files changed, 407 insertions(+), 129 deletions(-) create mode 100644 packages/pynumaflow/pynumaflow/_nack.py create mode 100644 packages/pynumaflow/pynumaflow/proto/common/nack_options.proto create mode 100644 packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2.py create mode 100644 packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2.pyi create mode 100644 packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2_grpc.py diff --git a/packages/pynumaflow/pynumaflow/_constants.py b/packages/pynumaflow/pynumaflow/_constants.py index 06bf0ead..ed015671 100644 --- a/packages/pynumaflow/pynumaflow/_constants.py +++ b/packages/pynumaflow/pynumaflow/_constants.py @@ -54,6 +54,7 @@ STREAM_EOF = "EOF" DELIMITER = ":" DROP = "U+005C__DROP__" +NACK = "U+005C__NACK__" _PROCESS_COUNT = os.cpu_count() # Cap max value to 16 diff --git a/packages/pynumaflow/pynumaflow/_nack.py b/packages/pynumaflow/pynumaflow/_nack.py new file mode 100644 index 00000000..e5c2ba59 --- /dev/null +++ b/packages/pynumaflow/pynumaflow/_nack.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from pynumaflow.proto.common import nack_options_pb2 + + +@dataclass +class NackOptions: + """Per-message redelivery options for a nack.""" + + delay: int | None = None + max_deliveries: int | None = None + reason: str | None = None + + def _to_proto(self) -> nack_options_pb2.NackOptions: + return nack_options_pb2.NackOptions( + reason=self.reason, + max_deliveries=self.max_deliveries, + delay=self.delay, + ) + + +def _nack_options_to_proto( + opts: NackOptions | None, +) -> nack_options_pb2.NackOptions | None: + if opts is None: + return None + return opts._to_proto() + + +def _nack_options_from_proto( + proto: nack_options_pb2.NackOptions, +) -> NackOptions: + return NackOptions( + delay=proto.delay if proto.HasField("delay") else None, + max_deliveries=proto.max_deliveries if proto.HasField("max_deliveries") else None, + reason=proto.reason if proto.HasField("reason") else None, + ) \ No newline at end of file diff --git a/packages/pynumaflow/pynumaflow/batchmapper/__init__.py b/packages/pynumaflow/pynumaflow/batchmapper/__init__.py index 0438d8f6..3374a0f1 100644 --- a/packages/pynumaflow/pynumaflow/batchmapper/__init__.py +++ b/packages/pynumaflow/pynumaflow/batchmapper/__init__.py @@ -8,6 +8,7 @@ BatchResponse, ) from pynumaflow.batchmapper.async_server import BatchMapAsyncServer +from pynumaflow._nack import NackOptions __all__ = [ "Message", @@ -17,4 +18,5 @@ "BatchMapper", "BatchResponses", "BatchResponse", + "NackOptions", ] diff --git a/packages/pynumaflow/pynumaflow/batchmapper/_dtypes.py b/packages/pynumaflow/pynumaflow/batchmapper/_dtypes.py index 754f4916..0cba0031 100644 --- a/packages/pynumaflow/pynumaflow/batchmapper/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/batchmapper/_dtypes.py @@ -5,7 +5,8 @@ from typing import TypeAlias, TypeVar from collections.abc import AsyncIterable, Callable -from pynumaflow._constants import DROP +from pynumaflow._constants import DROP, NACK +from pynumaflow._nack import NackOptions from pynumaflow._validate import _validate_message_fields M = TypeVar("M", bound="Message") @@ -27,6 +28,7 @@ class Message: _value: bytes _keys: list[str] _tags: list[str] + _nack_options: NackOptions | None def __init__(self, value: bytes, keys: list[str] | None = None, tags: list[str] | None = None): """ @@ -36,12 +38,23 @@ def __init__(self, value: bytes, keys: list[str] | None = None, tags: list[str] self._keys = keys or [] self._tags = tags or [] self._value = value or b"" + self._nack_options = None # returns the Message Object which will be dropped @classmethod def to_drop(cls: type[M]) -> M: return cls(b"", None, [DROP]) + @classmethod + def to_nack(cls: type[M], opts: NackOptions | None = None) -> M: + m = cls(b"", None, [NACK]) + m._nack_options = opts + return m + + @property + def nack_options(self) -> NackOptions | None: + return self._nack_options + @property def value(self) -> bytes: return self._value diff --git a/packages/pynumaflow/pynumaflow/batchmapper/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/batchmapper/servicer/async_servicer.py index 63733961..5706cc1c 100644 --- a/packages/pynumaflow/pynumaflow/batchmapper/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/batchmapper/servicer/async_servicer.py @@ -10,6 +10,7 @@ from pynumaflow.shared.server import update_context_err from pynumaflow.types import NumaflowServicerContext from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING +from pynumaflow._nack import _nack_options_to_proto class AsyncBatchMapServicer(map_pb2_grpc.MapServicer): @@ -80,7 +81,7 @@ async def MapFn( for msg in batch_response.messages: single_req_resp.append( map_pb2.MapResponse.Result( - keys=msg.keys, value=msg.value, tags=msg.tags + keys=msg.keys, value=msg.value, tags=msg.tags, nack_options=_nack_options_to_proto(msg.nack_options) ) ) # send the response for a given ID back to the stream diff --git a/packages/pynumaflow/pynumaflow/mapper/__init__.py b/packages/pynumaflow/pynumaflow/mapper/__init__.py index bb699960..477cd187 100644 --- a/packages/pynumaflow/pynumaflow/mapper/__init__.py +++ b/packages/pynumaflow/pynumaflow/mapper/__init__.py @@ -4,6 +4,7 @@ from pynumaflow.mapper._dtypes import Message, Messages, Datum, DROP, Mapper from pynumaflow._metadata import UserMetadata, SystemMetadata +from pynumaflow._nack import NackOptions __all__ = [ "Message", @@ -16,4 +17,5 @@ "MapMultiprocServer", "UserMetadata", "SystemMetadata", + "NackOptions", ] diff --git a/packages/pynumaflow/pynumaflow/mapper/_dtypes.py b/packages/pynumaflow/pynumaflow/mapper/_dtypes.py index 06fa35d2..1f870e1b 100644 --- a/packages/pynumaflow/pynumaflow/mapper/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/mapper/_dtypes.py @@ -6,7 +6,8 @@ from collections.abc import Callable from warnings import warn -from pynumaflow._constants import DROP +from pynumaflow._constants import DROP, NACK +from pynumaflow._nack import NackOptions from pynumaflow._metadata import UserMetadata, SystemMetadata from pynumaflow._validate import _validate_message_fields @@ -30,6 +31,7 @@ class Message: _keys: list[str] _tags: list[str] _user_metadata: UserMetadata + _nack_options: NackOptions | None def __init__( self, @@ -46,12 +48,23 @@ def __init__( self._tags = tags or [] self._value = value or b"" self._user_metadata = user_metadata or UserMetadata() + self._nack_options = None # returns the Message Object which will be dropped @classmethod def to_drop(cls: type[M]) -> M: return cls(b"", None, [DROP]) + @classmethod + def to_nack(cls: type[M], opts: NackOptions | None = None) -> M: + m = cls(b"", None, [NACK]) + m._nack_options = opts + return m + + @property + def nack_options(self) -> NackOptions | None: + return self._nack_options + @property def value(self) -> bytes: return self._value diff --git a/packages/pynumaflow/pynumaflow/mapper/_servicer/_async_servicer.py b/packages/pynumaflow/pynumaflow/mapper/_servicer/_async_servicer.py index 7afe3b04..3d1801fd 100644 --- a/packages/pynumaflow/pynumaflow/mapper/_servicer/_async_servicer.py +++ b/packages/pynumaflow/pynumaflow/mapper/_servicer/_async_servicer.py @@ -7,6 +7,7 @@ from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING from pynumaflow.mapper._dtypes import MapAsyncCallable, Datum, MapError, Message, Messages from pynumaflow._metadata import _user_and_system_metadata_from_proto +from pynumaflow._nack import _nack_options_to_proto from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc from pynumaflow.shared.server import update_context_err from pynumaflow.types import NumaflowServicerContext @@ -143,6 +144,7 @@ async def _invoke_map(self, req: map_pb2.MapRequest, result_queue: NonBlockingIt value=msg.value, tags=msg.tags, metadata=msg.user_metadata._to_proto(), + nack_options=_nack_options_to_proto(msg.nack_options), ) ) await result_queue.put(map_pb2.MapResponse(results=datums, id=req.id)) diff --git a/packages/pynumaflow/pynumaflow/mapper/_servicer/_sync_servicer.py b/packages/pynumaflow/pynumaflow/mapper/_servicer/_sync_servicer.py index 49d0898f..107d9c43 100644 --- a/packages/pynumaflow/pynumaflow/mapper/_servicer/_sync_servicer.py +++ b/packages/pynumaflow/pynumaflow/mapper/_servicer/_sync_servicer.py @@ -9,6 +9,7 @@ from pynumaflow._constants import NUM_THREADS_DEFAULT, STREAM_EOF, _LOGGER, ERR_UDF_EXCEPTION_STRING from pynumaflow.mapper._dtypes import MapSyncCallable, Datum, MapError +from pynumaflow._nack import _nack_options_to_proto from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc from pynumaflow.shared.synciter import SyncIterator from pynumaflow.types import NumaflowServicerContext @@ -155,6 +156,7 @@ def _invoke_map( value=resp.value, tags=resp.tags, metadata=resp.user_metadata._to_proto(), + nack_options=_nack_options_to_proto(resp.nack_options), ) ) result_queue.put(map_pb2.MapResponse(results=results, id=request.id)) diff --git a/packages/pynumaflow/pynumaflow/mapstreamer/__init__.py b/packages/pynumaflow/pynumaflow/mapstreamer/__init__.py index f26f4bd4..f993e607 100644 --- a/packages/pynumaflow/pynumaflow/mapstreamer/__init__.py +++ b/packages/pynumaflow/pynumaflow/mapstreamer/__init__.py @@ -2,6 +2,7 @@ from pynumaflow.mapstreamer._dtypes import Message, Messages, Datum, MapStreamer from pynumaflow.mapstreamer.async_server import MapStreamAsyncServer +from pynumaflow._nack import NackOptions __all__ = [ "Message", @@ -10,4 +11,5 @@ "DROP", "MapStreamAsyncServer", "MapStreamer", + "NackOptions", ] diff --git a/packages/pynumaflow/pynumaflow/mapstreamer/_dtypes.py b/packages/pynumaflow/pynumaflow/mapstreamer/_dtypes.py index 8c8659e6..01e7fac9 100644 --- a/packages/pynumaflow/pynumaflow/mapstreamer/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/mapstreamer/_dtypes.py @@ -6,7 +6,8 @@ from collections.abc import AsyncIterable, Callable from warnings import warn -from pynumaflow._constants import DROP +from pynumaflow._constants import DROP, NACK +from pynumaflow._nack import NackOptions from pynumaflow._validate import _validate_message_fields M = TypeVar("M", bound="Message") @@ -27,6 +28,7 @@ class Message: _value: bytes _keys: list[str] _tags: list[str] + _nack_options: NackOptions | None def __init__(self, value: bytes, keys: list[str] | None = None, tags: list[str] | None = None): """ @@ -36,12 +38,23 @@ def __init__(self, value: bytes, keys: list[str] | None = None, tags: list[str] self._keys = keys or [] self._tags = tags or [] self._value = value or b"" + self._nack_options = None # returns the Message Object which will be dropped @classmethod def to_drop(cls: type[M]) -> M: return cls(b"", None, [DROP]) + @classmethod + def to_nack(cls: type[M], opts: NackOptions | None = None) -> M: + m = cls(b"", None, [NACK]) + m._nack_options = opts + return m + + @property + def nack_options(self) -> NackOptions | None: + return self._nack_options + @property def value(self) -> bytes: return self._value diff --git a/packages/pynumaflow/pynumaflow/mapstreamer/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/mapstreamer/servicer/async_servicer.py index d1a581f3..e70c3126 100644 --- a/packages/pynumaflow/pynumaflow/mapstreamer/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/mapstreamer/servicer/async_servicer.py @@ -7,6 +7,7 @@ from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING from pynumaflow.mapstreamer import Datum from pynumaflow.mapstreamer._dtypes import MapStreamCallable, MapStreamError +from pynumaflow._nack import _nack_options_to_proto from pynumaflow.proto.mapper import map_pb2_grpc, map_pb2 from pynumaflow.shared.server import update_context_err from pynumaflow.types import NumaflowServicerContext @@ -158,7 +159,7 @@ async def _invoke_map_stream( # same time in the next vertex instead of in a true streaming fashion. # The asyncio.sleep(0) will yield the control back to event loop avoiding starvation. async for msg in self.__map_stream_handler(list(req.request.keys), datum): - res = map_pb2.MapResponse.Result(keys=msg.keys, value=msg.value, tags=msg.tags) + res = map_pb2.MapResponse.Result(keys=msg.keys, value=msg.value, tags=msg.tags, nack_options=_nack_options_to_proto(msg.nack_options)) await result_queue.put(map_pb2.MapResponse(results=[res], id=req.id)) await asyncio.sleep(0) diff --git a/packages/pynumaflow/pynumaflow/proto/common/nack_options.proto b/packages/pynumaflow/pynumaflow/proto/common/nack_options.proto new file mode 100644 index 00000000..c1a60bc5 --- /dev/null +++ b/packages/pynumaflow/pynumaflow/proto/common/nack_options.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package common; + +// NackOptions carries per-message redelivery options for a negative acknowledgement (nack). +message NackOptions { + // reason is a human-readable reason for nacking the message. + optional string reason = 1; + // max_deliveries is the maximum number of redelivery attempts. + optional uint32 max_deliveries = 2; + // delay is the redelivery delay in milliseconds. + optional uint64 delay = 3; +} diff --git a/packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2.py b/packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2.py new file mode 100644 index 00000000..8d8a3842 --- /dev/null +++ b/packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: pynumaflow/proto/common/nack_options.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'pynumaflow/proto/common/nack_options.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*pynumaflow/proto/common/nack_options.proto\x12\x06\x63ommon\"{\n\x0bNackOptions\x12\x13\n\x06reason\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x1b\n\x0emax_deliveries\x18\x02 \x01(\rH\x01\x88\x01\x01\x12\x12\n\x05\x64\x65lay\x18\x03 \x01(\x04H\x02\x88\x01\x01\x42\t\n\x07_reasonB\x11\n\x0f_max_deliveriesB\x08\n\x06_delayb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'pynumaflow.proto.common.nack_options_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_NACKOPTIONS']._serialized_start=54 + _globals['_NACKOPTIONS']._serialized_end=177 +# @@protoc_insertion_point(module_scope) diff --git a/packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2.pyi b/packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2.pyi new file mode 100644 index 00000000..695b5437 --- /dev/null +++ b/packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2.pyi @@ -0,0 +1,15 @@ +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Optional as _Optional + +DESCRIPTOR: _descriptor.FileDescriptor + +class NackOptions(_message.Message): + __slots__ = ("reason", "max_deliveries", "delay") + REASON_FIELD_NUMBER: _ClassVar[int] + MAX_DELIVERIES_FIELD_NUMBER: _ClassVar[int] + DELAY_FIELD_NUMBER: _ClassVar[int] + reason: str + max_deliveries: int + delay: int + def __init__(self, reason: _Optional[str] = ..., max_deliveries: _Optional[int] = ..., delay: _Optional[int] = ...) -> None: ... diff --git a/packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2_grpc.py b/packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2_grpc.py new file mode 100644 index 00000000..92a2fb28 --- /dev/null +++ b/packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2_grpc.py @@ -0,0 +1,24 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + + +GRPC_GENERATED_VERSION = '1.75.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in pynumaflow/proto/common/nack_options_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) diff --git a/packages/pynumaflow/pynumaflow/proto/mapper/map.proto b/packages/pynumaflow/pynumaflow/proto/mapper/map.proto index 58518d94..3f858b46 100644 --- a/packages/pynumaflow/pynumaflow/proto/mapper/map.proto +++ b/packages/pynumaflow/pynumaflow/proto/mapper/map.proto @@ -5,6 +5,7 @@ option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"; import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; import "pynumaflow/proto/common/metadata.proto"; +import "pynumaflow/proto/common/nack_options.proto"; package map.v1; @@ -61,6 +62,7 @@ message MapResponse { repeated string tags = 3; // metadata of the message common.Metadata metadata = 4; + optional common.NackOptions nack_options = 5; } repeated Result results = 1; // This ID is used to refer the responses to the request it corresponds to. diff --git a/packages/pynumaflow/pynumaflow/proto/mapper/map_pb2.py b/packages/pynumaflow/pynumaflow/proto/mapper/map_pb2.py index 924d5f74..e3e720a0 100644 --- a/packages/pynumaflow/pynumaflow/proto/mapper/map_pb2.py +++ b/packages/pynumaflow/pynumaflow/proto/mapper/map_pb2.py @@ -25,9 +25,10 @@ from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 from pynumaflow.proto.common import metadata_pb2 as pynumaflow_dot_proto_dot_common_dot_metadata__pb2 +from pynumaflow.proto.common import nack_options_pb2 as pynumaflow_dot_proto_dot_common_dot_nack__options__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n!pynumaflow/proto/mapper/map.proto\x12\x06map.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&pynumaflow/proto/common/metadata.proto\"\xd0\x03\n\nMapRequest\x12+\n\x07request\x18\x01 \x01(\x0b\x32\x1a.map.v1.MapRequest.Request\x12\n\n\x02id\x18\x02 \x01(\t\x12)\n\thandshake\x18\x03 \x01(\x0b\x32\x11.map.v1.HandshakeH\x00\x88\x01\x01\x12/\n\x06status\x18\x04 \x01(\x0b\x32\x1a.map.v1.TransmissionStatusH\x01\x88\x01\x01\x1a\x93\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x38\n\x07headers\x18\x05 \x03(\x0b\x32\'.map.v1.MapRequest.Request.HeadersEntry\x12\"\n\x08metadata\x18\x06 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshakeB\t\n\x07_status\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\"\x94\x02\n\x0bMapResponse\x12+\n\x07results\x18\x01 \x03(\x0b\x32\x1a.map.v1.MapResponse.Result\x12\n\n\x02id\x18\x02 \x01(\t\x12)\n\thandshake\x18\x03 \x01(\x0b\x32\x11.map.v1.HandshakeH\x00\x88\x01\x01\x12/\n\x06status\x18\x04 \x01(\x0b\x32\x1a.map.v1.TransmissionStatusH\x01\x88\x01\x01\x1aW\n\x06Result\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12\x0c\n\x04tags\x18\x03 \x03(\t\x12\"\n\x08metadata\x18\x04 \x01(\x0b\x32\x10.common.MetadataB\x0c\n\n_handshakeB\t\n\x07_status\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\x32u\n\x03Map\x12\x34\n\x05MapFn\x12\x12.map.v1.MapRequest\x1a\x13.map.v1.MapResponse(\x01\x30\x01\x12\x38\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x15.map.v1.ReadyResponseB7Z5github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1b\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n!pynumaflow/proto/mapper/map.proto\x12\x06map.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&pynumaflow/proto/common/metadata.proto\x1a*pynumaflow/proto/common/nack_options.proto\"\xd0\x03\n\nMapRequest\x12+\n\x07request\x18\x01 \x01(\x0b\x32\x1a.map.v1.MapRequest.Request\x12\n\n\x02id\x18\x02 \x01(\t\x12)\n\thandshake\x18\x03 \x01(\x0b\x32\x11.map.v1.HandshakeH\x00\x88\x01\x01\x12/\n\x06status\x18\x04 \x01(\x0b\x32\x1a.map.v1.TransmissionStatusH\x01\x88\x01\x01\x1a\x93\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x38\n\x07headers\x18\x05 \x03(\x0b\x32\'.map.v1.MapRequest.Request.HeadersEntry\x12\"\n\x08metadata\x18\x06 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshakeB\t\n\x07_status\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\"\xd6\x02\n\x0bMapResponse\x12+\n\x07results\x18\x01 \x03(\x0b\x32\x1a.map.v1.MapResponse.Result\x12\n\n\x02id\x18\x02 \x01(\t\x12)\n\thandshake\x18\x03 \x01(\x0b\x32\x11.map.v1.HandshakeH\x00\x88\x01\x01\x12/\n\x06status\x18\x04 \x01(\x0b\x32\x1a.map.v1.TransmissionStatusH\x01\x88\x01\x01\x1a\x98\x01\n\x06Result\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12\x0c\n\x04tags\x18\x03 \x03(\t\x12\"\n\x08metadata\x18\x04 \x01(\x0b\x32\x10.common.Metadata\x12.\n\x0cnack_options\x18\x05 \x01(\x0b\x32\x13.common.NackOptionsH\x00\x88\x01\x01\x42\x0f\n\r_nack_optionsB\x0c\n\n_handshakeB\t\n\x07_status\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\x32u\n\x03Map\x12\x34\n\x05MapFn\x12\x12.map.v1.MapRequest\x1a\x13.map.v1.MapResponse(\x01\x30\x01\x12\x38\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x15.map.v1.ReadyResponseB7Z5github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1b\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -37,22 +38,22 @@ _globals['DESCRIPTOR']._serialized_options = b'Z5github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1' _globals['_MAPREQUEST_REQUEST_HEADERSENTRY']._loaded_options = None _globals['_MAPREQUEST_REQUEST_HEADERSENTRY']._serialized_options = b'8\001' - _globals['_MAPREQUEST']._serialized_start=148 - _globals['_MAPREQUEST']._serialized_end=612 - _globals['_MAPREQUEST_REQUEST']._serialized_start=312 - _globals['_MAPREQUEST_REQUEST']._serialized_end=587 - _globals['_MAPREQUEST_REQUEST_HEADERSENTRY']._serialized_start=541 - _globals['_MAPREQUEST_REQUEST_HEADERSENTRY']._serialized_end=587 - _globals['_HANDSHAKE']._serialized_start=614 - _globals['_HANDSHAKE']._serialized_end=638 - _globals['_TRANSMISSIONSTATUS']._serialized_start=640 - _globals['_TRANSMISSIONSTATUS']._serialized_end=673 - _globals['_MAPRESPONSE']._serialized_start=676 - _globals['_MAPRESPONSE']._serialized_end=952 - _globals['_MAPRESPONSE_RESULT']._serialized_start=840 - _globals['_MAPRESPONSE_RESULT']._serialized_end=927 - _globals['_READYRESPONSE']._serialized_start=954 - _globals['_READYRESPONSE']._serialized_end=984 - _globals['_MAP']._serialized_start=986 - _globals['_MAP']._serialized_end=1103 + _globals['_MAPREQUEST']._serialized_start=192 + _globals['_MAPREQUEST']._serialized_end=656 + _globals['_MAPREQUEST_REQUEST']._serialized_start=356 + _globals['_MAPREQUEST_REQUEST']._serialized_end=631 + _globals['_MAPREQUEST_REQUEST_HEADERSENTRY']._serialized_start=585 + _globals['_MAPREQUEST_REQUEST_HEADERSENTRY']._serialized_end=631 + _globals['_HANDSHAKE']._serialized_start=658 + _globals['_HANDSHAKE']._serialized_end=682 + _globals['_TRANSMISSIONSTATUS']._serialized_start=684 + _globals['_TRANSMISSIONSTATUS']._serialized_end=717 + _globals['_MAPRESPONSE']._serialized_start=720 + _globals['_MAPRESPONSE']._serialized_end=1062 + _globals['_MAPRESPONSE_RESULT']._serialized_start=885 + _globals['_MAPRESPONSE_RESULT']._serialized_end=1037 + _globals['_READYRESPONSE']._serialized_start=1064 + _globals['_READYRESPONSE']._serialized_end=1094 + _globals['_MAP']._serialized_start=1096 + _globals['_MAP']._serialized_end=1213 # @@protoc_insertion_point(module_scope) diff --git a/packages/pynumaflow/pynumaflow/proto/mapper/map_pb2.pyi b/packages/pynumaflow/pynumaflow/proto/mapper/map_pb2.pyi index 1f94a05f..3fb7451f 100644 --- a/packages/pynumaflow/pynumaflow/proto/mapper/map_pb2.pyi +++ b/packages/pynumaflow/pynumaflow/proto/mapper/map_pb2.pyi @@ -3,6 +3,7 @@ import datetime from google.protobuf import empty_pb2 as _empty_pb2 from google.protobuf import timestamp_pb2 as _timestamp_pb2 from pynumaflow.proto.common import metadata_pb2 as _metadata_pb2 +from pynumaflow.proto.common import nack_options_pb2 as _nack_options_pb2 from google.protobuf.internal import containers as _containers from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message @@ -60,16 +61,18 @@ class TransmissionStatus(_message.Message): class MapResponse(_message.Message): __slots__ = ("results", "id", "handshake", "status") class Result(_message.Message): - __slots__ = ("keys", "value", "tags", "metadata") + __slots__ = ("keys", "value", "tags", "metadata", "nack_options") KEYS_FIELD_NUMBER: _ClassVar[int] VALUE_FIELD_NUMBER: _ClassVar[int] TAGS_FIELD_NUMBER: _ClassVar[int] METADATA_FIELD_NUMBER: _ClassVar[int] + NACK_OPTIONS_FIELD_NUMBER: _ClassVar[int] keys: _containers.RepeatedScalarFieldContainer[str] value: bytes tags: _containers.RepeatedScalarFieldContainer[str] metadata: _metadata_pb2.Metadata - def __init__(self, keys: _Optional[_Iterable[str]] = ..., value: _Optional[bytes] = ..., tags: _Optional[_Iterable[str]] = ..., metadata: _Optional[_Union[_metadata_pb2.Metadata, _Mapping]] = ...) -> None: ... + nack_options: _nack_options_pb2.NackOptions + def __init__(self, keys: _Optional[_Iterable[str]] = ..., value: _Optional[bytes] = ..., tags: _Optional[_Iterable[str]] = ..., metadata: _Optional[_Union[_metadata_pb2.Metadata, _Mapping]] = ..., nack_options: _Optional[_Union[_nack_options_pb2.NackOptions, _Mapping]] = ...) -> None: ... RESULTS_FIELD_NUMBER: _ClassVar[int] ID_FIELD_NUMBER: _ClassVar[int] HANDSHAKE_FIELD_NUMBER: _ClassVar[int] diff --git a/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto b/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto index fdba28d2..ae3d8bcb 100644 --- a/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto +++ b/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto @@ -3,6 +3,7 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; import "pynumaflow/proto/common/metadata.proto"; +import "pynumaflow/proto/common/nack_options.proto"; package sink.v1; @@ -68,6 +69,7 @@ enum Status { FALLBACK = 2; SERVE = 3; ON_SUCCESS = 4; + NACK = 5; } /** @@ -89,6 +91,7 @@ message SinkResponse { optional bytes serve_response = 4; // on_success_msg is the message to be sent to on_success sink. optional Message on_success_msg = 5; + optional common.NackOptions nack_options = 6; } repeated Result results = 1; optional Handshake handshake = 2; diff --git a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py index 651b2784..afa617e3 100644 --- a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py +++ b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py @@ -25,9 +25,10 @@ from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 from pynumaflow.proto.common import metadata_pb2 as pynumaflow_dot_proto_dot_common_dot_metadata__pb2 +from pynumaflow.proto.common import nack_options_pb2 as pynumaflow_dot_proto_dot_common_dot_nack__options__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"pynumaflow/proto/sinker/sink.proto\x12\x07sink.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&pynumaflow/proto/common/metadata.proto\"\xc7\x03\n\x0bSinkRequest\x12-\n\x07request\x18\x01 \x01(\x0b\x32\x1c.sink.v1.SinkRequest.Request\x12+\n\x06status\x18\x02 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatus\x12*\n\thandshake\x18\x03 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x1a\xa1\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\n\n\x02id\x18\x05 \x01(\t\x12:\n\x07headers\x18\x06 \x03(\x0b\x32).sink.v1.SinkRequest.Request.HeadersEntry\x12\"\n\x08metadata\x18\x07 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\"\xcf\x03\n\x0cSinkResponse\x12-\n\x07results\x18\x01 \x03(\x0b\x32\x1c.sink.v1.SinkResponse.Result\x12*\n\thandshake\x18\x02 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x12\x30\n\x06status\x18\x03 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatusH\x01\x88\x01\x01\x1a\x98\x02\n\x06Result\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1f\n\x06status\x18\x02 \x01(\x0e\x32\x0f.sink.v1.Status\x12\x0f\n\x07\x65rr_msg\x18\x03 \x01(\t\x12\x1b\n\x0eserve_response\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x12\x41\n\x0eon_success_msg\x18\x05 \x01(\x0b\x32$.sink.v1.SinkResponse.Result.MessageH\x01\x88\x01\x01\x1aJ\n\x07Message\x12\r\n\x05value\x18\x01 \x01(\x0c\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12\"\n\x08metadata\x18\x03 \x01(\x0b\x32\x10.common.MetadataB\x11\n\x0f_serve_responseB\x11\n\x0f_on_success_msgB\x0c\n\n_handshakeB\t\n\x07_status*K\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x12\x0c\n\x08\x46\x41LLBACK\x10\x02\x12\t\n\x05SERVE\x10\x03\x12\x0e\n\nON_SUCCESS\x10\x04\x32|\n\x04Sink\x12\x39\n\x06SinkFn\x12\x14.sink.v1.SinkRequest\x1a\x15.sink.v1.SinkResponse(\x01\x30\x01\x12\x39\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x16.sink.v1.ReadyResponseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"pynumaflow/proto/sinker/sink.proto\x12\x07sink.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&pynumaflow/proto/common/metadata.proto\x1a*pynumaflow/proto/common/nack_options.proto\"\xc7\x03\n\x0bSinkRequest\x12-\n\x07request\x18\x01 \x01(\x0b\x32\x1c.sink.v1.SinkRequest.Request\x12+\n\x06status\x18\x02 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatus\x12*\n\thandshake\x18\x03 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x1a\xa1\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\n\n\x02id\x18\x05 \x01(\t\x12:\n\x07headers\x18\x06 \x03(\x0b\x32).sink.v1.SinkRequest.Request.HeadersEntry\x12\"\n\x08metadata\x18\x07 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\"\x90\x04\n\x0cSinkResponse\x12-\n\x07results\x18\x01 \x03(\x0b\x32\x1c.sink.v1.SinkResponse.Result\x12*\n\thandshake\x18\x02 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x12\x30\n\x06status\x18\x03 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatusH\x01\x88\x01\x01\x1a\xd9\x02\n\x06Result\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1f\n\x06status\x18\x02 \x01(\x0e\x32\x0f.sink.v1.Status\x12\x0f\n\x07\x65rr_msg\x18\x03 \x01(\t\x12\x1b\n\x0eserve_response\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x12\x41\n\x0eon_success_msg\x18\x05 \x01(\x0b\x32$.sink.v1.SinkResponse.Result.MessageH\x01\x88\x01\x01\x12.\n\x0cnack_options\x18\x06 \x01(\x0b\x32\x13.common.NackOptionsH\x02\x88\x01\x01\x1aJ\n\x07Message\x12\r\n\x05value\x18\x01 \x01(\x0c\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12\"\n\x08metadata\x18\x03 \x01(\x0b\x32\x10.common.MetadataB\x11\n\x0f_serve_responseB\x11\n\x0f_on_success_msgB\x0f\n\r_nack_optionsB\x0c\n\n_handshakeB\t\n\x07_status*U\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x12\x0c\n\x08\x46\x41LLBACK\x10\x02\x12\t\n\x05SERVE\x10\x03\x12\x0e\n\nON_SUCCESS\x10\x04\x12\x08\n\x04NACK\x10\x05\x32|\n\x04Sink\x12\x39\n\x06SinkFn\x12\x14.sink.v1.SinkRequest\x1a\x15.sink.v1.SinkResponse(\x01\x30\x01\x12\x39\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x16.sink.v1.ReadyResponseb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -36,26 +37,26 @@ DESCRIPTOR._loaded_options = None _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._loaded_options = None _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._serialized_options = b'8\001' - _globals['_STATUS']._serialized_start=1166 - _globals['_STATUS']._serialized_end=1241 - _globals['_SINKREQUEST']._serialized_start=150 - _globals['_SINKREQUEST']._serialized_end=605 - _globals['_SINKREQUEST_REQUEST']._serialized_start=302 - _globals['_SINKREQUEST_REQUEST']._serialized_end=591 - _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._serialized_start=545 - _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._serialized_end=591 - _globals['_HANDSHAKE']._serialized_start=607 - _globals['_HANDSHAKE']._serialized_end=631 - _globals['_READYRESPONSE']._serialized_start=633 - _globals['_READYRESPONSE']._serialized_end=663 - _globals['_TRANSMISSIONSTATUS']._serialized_start=665 - _globals['_TRANSMISSIONSTATUS']._serialized_end=698 - _globals['_SINKRESPONSE']._serialized_start=701 - _globals['_SINKRESPONSE']._serialized_end=1164 - _globals['_SINKRESPONSE_RESULT']._serialized_start=859 - _globals['_SINKRESPONSE_RESULT']._serialized_end=1139 - _globals['_SINKRESPONSE_RESULT_MESSAGE']._serialized_start=1027 - _globals['_SINKRESPONSE_RESULT_MESSAGE']._serialized_end=1101 - _globals['_SINK']._serialized_start=1243 - _globals['_SINK']._serialized_end=1367 + _globals['_STATUS']._serialized_start=1275 + _globals['_STATUS']._serialized_end=1360 + _globals['_SINKREQUEST']._serialized_start=194 + _globals['_SINKREQUEST']._serialized_end=649 + _globals['_SINKREQUEST_REQUEST']._serialized_start=346 + _globals['_SINKREQUEST_REQUEST']._serialized_end=635 + _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._serialized_start=589 + _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._serialized_end=635 + _globals['_HANDSHAKE']._serialized_start=651 + _globals['_HANDSHAKE']._serialized_end=675 + _globals['_READYRESPONSE']._serialized_start=677 + _globals['_READYRESPONSE']._serialized_end=707 + _globals['_TRANSMISSIONSTATUS']._serialized_start=709 + _globals['_TRANSMISSIONSTATUS']._serialized_end=742 + _globals['_SINKRESPONSE']._serialized_start=745 + _globals['_SINKRESPONSE']._serialized_end=1273 + _globals['_SINKRESPONSE_RESULT']._serialized_start=903 + _globals['_SINKRESPONSE_RESULT']._serialized_end=1248 + _globals['_SINKRESPONSE_RESULT_MESSAGE']._serialized_start=1119 + _globals['_SINKRESPONSE_RESULT_MESSAGE']._serialized_end=1193 + _globals['_SINK']._serialized_start=1362 + _globals['_SINK']._serialized_end=1486 # @@protoc_insertion_point(module_scope) diff --git a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi index 2c696e72..df5c4bfa 100644 --- a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi +++ b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi @@ -3,6 +3,7 @@ import datetime from google.protobuf import empty_pb2 as _empty_pb2 from google.protobuf import timestamp_pb2 as _timestamp_pb2 from pynumaflow.proto.common import metadata_pb2 as _metadata_pb2 +from pynumaflow.proto.common import nack_options_pb2 as _nack_options_pb2 from google.protobuf.internal import containers as _containers from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper from google.protobuf import descriptor as _descriptor @@ -19,11 +20,13 @@ class Status(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): FALLBACK: _ClassVar[Status] SERVE: _ClassVar[Status] ON_SUCCESS: _ClassVar[Status] + NACK: _ClassVar[Status] SUCCESS: Status FAILURE: Status FALLBACK: Status SERVE: Status ON_SUCCESS: Status +NACK: Status class SinkRequest(_message.Message): __slots__ = ("request", "status", "handshake") @@ -80,7 +83,7 @@ class TransmissionStatus(_message.Message): class SinkResponse(_message.Message): __slots__ = ("results", "handshake", "status") class Result(_message.Message): - __slots__ = ("id", "status", "err_msg", "serve_response", "on_success_msg") + __slots__ = ("id", "status", "err_msg", "serve_response", "on_success_msg", "nack_options") class Message(_message.Message): __slots__ = ("value", "keys", "metadata") VALUE_FIELD_NUMBER: _ClassVar[int] @@ -95,12 +98,14 @@ class SinkResponse(_message.Message): ERR_MSG_FIELD_NUMBER: _ClassVar[int] SERVE_RESPONSE_FIELD_NUMBER: _ClassVar[int] ON_SUCCESS_MSG_FIELD_NUMBER: _ClassVar[int] + NACK_OPTIONS_FIELD_NUMBER: _ClassVar[int] id: str status: Status err_msg: str serve_response: bytes on_success_msg: SinkResponse.Result.Message - def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ..., on_success_msg: _Optional[_Union[SinkResponse.Result.Message, _Mapping]] = ...) -> None: ... + nack_options: _nack_options_pb2.NackOptions + def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ..., on_success_msg: _Optional[_Union[SinkResponse.Result.Message, _Mapping]] = ..., nack_options: _Optional[_Union[_nack_options_pb2.NackOptions, _Mapping]] = ...) -> None: ... RESULTS_FIELD_NUMBER: _ClassVar[int] HANDSHAKE_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] diff --git a/packages/pynumaflow/pynumaflow/proto/sourcer/source.proto b/packages/pynumaflow/pynumaflow/proto/sourcer/source.proto index 9bcd4aff..33ec0768 100644 --- a/packages/pynumaflow/pynumaflow/proto/sourcer/source.proto +++ b/packages/pynumaflow/pynumaflow/proto/sourcer/source.proto @@ -3,6 +3,7 @@ syntax = "proto3"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; import "pynumaflow/proto/common/metadata.proto"; +import "pynumaflow/proto/common/nack_options.proto"; package source.v1; @@ -150,6 +151,7 @@ message NackRequest { message Request { // Required field holding the offsets to be nacked repeated Offset offsets = 1; + optional common.NackOptions nack_options = 2; } // Required field holding the request. The list will be ordered and will have the same order as the original Read response. Request request = 1; diff --git a/packages/pynumaflow/pynumaflow/proto/sourcer/source_pb2.py b/packages/pynumaflow/pynumaflow/proto/sourcer/source_pb2.py index 041838f8..8149aab4 100644 --- a/packages/pynumaflow/pynumaflow/proto/sourcer/source_pb2.py +++ b/packages/pynumaflow/pynumaflow/proto/sourcer/source_pb2.py @@ -25,9 +25,10 @@ from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 from pynumaflow.proto.common import metadata_pb2 as pynumaflow_dot_proto_dot_common_dot_metadata__pb2 +from pynumaflow.proto.common import nack_options_pb2 as pynumaflow_dot_proto_dot_common_dot_nack__options__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n%pynumaflow/proto/sourcer/source.proto\x12\tsource.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a&pynumaflow/proto/common/metadata.proto\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\xb1\x01\n\x0bReadRequest\x12/\n\x07request\x18\x01 \x01(\x0b\x32\x1e.source.v1.ReadRequest.Request\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x35\n\x07Request\x12\x13\n\x0bnum_records\x18\x01 \x01(\x04\x12\x15\n\rtimeout_in_ms\x18\x02 \x01(\rB\x0c\n\n_handshake\"\xa5\x05\n\x0cReadResponse\x12.\n\x06result\x18\x01 \x01(\x0b\x32\x1e.source.v1.ReadResponse.Result\x12.\n\x06status\x18\x02 \x01(\x0b\x32\x1e.source.v1.ReadResponse.Status\x12,\n\thandshake\x18\x03 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x8c\x02\n\x06Result\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12!\n\x06offset\x18\x02 \x01(\x0b\x32\x11.source.v1.Offset\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04keys\x18\x04 \x03(\t\x12<\n\x07headers\x18\x05 \x03(\x0b\x32+.source.v1.ReadResponse.Result.HeadersEntry\x12\"\n\x08metadata\x18\x06 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xe9\x01\n\x06Status\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\x12\x31\n\x04\x63ode\x18\x02 \x01(\x0e\x32#.source.v1.ReadResponse.Status.Code\x12\x38\n\x05\x65rror\x18\x03 \x01(\x0e\x32$.source.v1.ReadResponse.Status.ErrorH\x00\x88\x01\x01\x12\x10\n\x03msg\x18\x04 \x01(\tH\x01\x88\x01\x01\" \n\x04\x43ode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\"\x1f\n\x05\x45rror\x12\x0b\n\x07UNACKED\x10\x00\x12\t\n\x05OTHER\x10\x01\x42\x08\n\x06_errorB\x06\n\x04_msgB\x0c\n\n_handshake\"\xa7\x01\n\nAckRequest\x12.\n\x07request\x18\x01 \x01(\x0b\x32\x1d.source.v1.AckRequest.Request\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a-\n\x07Request\x12\"\n\x07offsets\x18\x01 \x03(\x0b\x32\x11.source.v1.OffsetB\x0c\n\n_handshake\"\xab\x01\n\x0b\x41\x63kResponse\x12-\n\x06result\x18\x01 \x01(\x0b\x32\x1d.source.v1.AckResponse.Result\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x31\n\x06Result\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x16.google.protobuf.EmptyB\x0c\n\n_handshake\"m\n\x0bNackRequest\x12/\n\x07request\x18\x01 \x01(\x0b\x32\x1e.source.v1.NackRequest.Request\x1a-\n\x07Request\x12\"\n\x07offsets\x18\x01 \x03(\x0b\x32\x11.source.v1.Offset\"q\n\x0cNackResponse\x12.\n\x06result\x18\x01 \x01(\x0b\x32\x1e.source.v1.NackResponse.Result\x1a\x31\n\x06Result\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x16.google.protobuf.Empty\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\"]\n\x0fPendingResponse\x12\x31\n\x06result\x18\x01 \x01(\x0b\x32!.source.v1.PendingResponse.Result\x1a\x17\n\x06Result\x12\r\n\x05\x63ount\x18\x01 \x01(\x03\"\x9c\x01\n\x12PartitionsResponse\x12\x34\n\x06result\x18\x01 \x01(\x0b\x32$.source.v1.PartitionsResponse.Result\x1aP\n\x06Result\x12\x12\n\npartitions\x18\x01 \x03(\x05\x12\x1d\n\x10total_partitions\x18\x02 \x01(\x05H\x00\x88\x01\x01\x42\x13\n\x11_total_partitions\".\n\x06Offset\x12\x0e\n\x06offset\x18\x01 \x01(\x0c\x12\x14\n\x0cpartition_id\x18\x02 \x01(\x05\x32\x83\x03\n\x06Source\x12=\n\x06ReadFn\x12\x16.source.v1.ReadRequest\x1a\x17.source.v1.ReadResponse(\x01\x30\x01\x12:\n\x05\x41\x63kFn\x12\x15.source.v1.AckRequest\x1a\x16.source.v1.AckResponse(\x01\x30\x01\x12\x39\n\x06NackFn\x12\x16.source.v1.NackRequest\x1a\x17.source.v1.NackResponse\x12?\n\tPendingFn\x12\x16.google.protobuf.Empty\x1a\x1a.source.v1.PendingResponse\x12\x45\n\x0cPartitionsFn\x12\x16.google.protobuf.Empty\x1a\x1d.source.v1.PartitionsResponse\x12;\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x18.source.v1.ReadyResponseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n%pynumaflow/proto/sourcer/source.proto\x12\tsource.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a&pynumaflow/proto/common/metadata.proto\x1a*pynumaflow/proto/common/nack_options.proto\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\xb1\x01\n\x0bReadRequest\x12/\n\x07request\x18\x01 \x01(\x0b\x32\x1e.source.v1.ReadRequest.Request\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x35\n\x07Request\x12\x13\n\x0bnum_records\x18\x01 \x01(\x04\x12\x15\n\rtimeout_in_ms\x18\x02 \x01(\rB\x0c\n\n_handshake\"\xa5\x05\n\x0cReadResponse\x12.\n\x06result\x18\x01 \x01(\x0b\x32\x1e.source.v1.ReadResponse.Result\x12.\n\x06status\x18\x02 \x01(\x0b\x32\x1e.source.v1.ReadResponse.Status\x12,\n\thandshake\x18\x03 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x8c\x02\n\x06Result\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12!\n\x06offset\x18\x02 \x01(\x0b\x32\x11.source.v1.Offset\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04keys\x18\x04 \x03(\t\x12<\n\x07headers\x18\x05 \x03(\x0b\x32+.source.v1.ReadResponse.Result.HeadersEntry\x12\"\n\x08metadata\x18\x06 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xe9\x01\n\x06Status\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\x12\x31\n\x04\x63ode\x18\x02 \x01(\x0e\x32#.source.v1.ReadResponse.Status.Code\x12\x38\n\x05\x65rror\x18\x03 \x01(\x0e\x32$.source.v1.ReadResponse.Status.ErrorH\x00\x88\x01\x01\x12\x10\n\x03msg\x18\x04 \x01(\tH\x01\x88\x01\x01\" \n\x04\x43ode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\"\x1f\n\x05\x45rror\x12\x0b\n\x07UNACKED\x10\x00\x12\t\n\x05OTHER\x10\x01\x42\x08\n\x06_errorB\x06\n\x04_msgB\x0c\n\n_handshake\"\xa7\x01\n\nAckRequest\x12.\n\x07request\x18\x01 \x01(\x0b\x32\x1d.source.v1.AckRequest.Request\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a-\n\x07Request\x12\"\n\x07offsets\x18\x01 \x03(\x0b\x32\x11.source.v1.OffsetB\x0c\n\n_handshake\"\xab\x01\n\x0b\x41\x63kResponse\x12-\n\x06result\x18\x01 \x01(\x0b\x32\x1d.source.v1.AckResponse.Result\x12,\n\thandshake\x18\x02 \x01(\x0b\x32\x14.source.v1.HandshakeH\x00\x88\x01\x01\x1a\x31\n\x06Result\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x16.google.protobuf.EmptyB\x0c\n\n_handshake\"\xae\x01\n\x0bNackRequest\x12/\n\x07request\x18\x01 \x01(\x0b\x32\x1e.source.v1.NackRequest.Request\x1an\n\x07Request\x12\"\n\x07offsets\x18\x01 \x03(\x0b\x32\x11.source.v1.Offset\x12.\n\x0cnack_options\x18\x02 \x01(\x0b\x32\x13.common.NackOptionsH\x00\x88\x01\x01\x42\x0f\n\r_nack_options\"q\n\x0cNackResponse\x12.\n\x06result\x18\x01 \x01(\x0b\x32\x1e.source.v1.NackResponse.Result\x1a\x31\n\x06Result\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x16.google.protobuf.Empty\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\"]\n\x0fPendingResponse\x12\x31\n\x06result\x18\x01 \x01(\x0b\x32!.source.v1.PendingResponse.Result\x1a\x17\n\x06Result\x12\r\n\x05\x63ount\x18\x01 \x01(\x03\"\x9c\x01\n\x12PartitionsResponse\x12\x34\n\x06result\x18\x01 \x01(\x0b\x32$.source.v1.PartitionsResponse.Result\x1aP\n\x06Result\x12\x12\n\npartitions\x18\x01 \x03(\x05\x12\x1d\n\x10total_partitions\x18\x02 \x01(\x05H\x00\x88\x01\x01\x42\x13\n\x11_total_partitions\".\n\x06Offset\x12\x0e\n\x06offset\x18\x01 \x01(\x0c\x12\x14\n\x0cpartition_id\x18\x02 \x01(\x05\x32\x83\x03\n\x06Source\x12=\n\x06ReadFn\x12\x16.source.v1.ReadRequest\x1a\x17.source.v1.ReadResponse(\x01\x30\x01\x12:\n\x05\x41\x63kFn\x12\x15.source.v1.AckRequest\x1a\x16.source.v1.AckResponse(\x01\x30\x01\x12\x39\n\x06NackFn\x12\x16.source.v1.NackRequest\x1a\x17.source.v1.NackResponse\x12?\n\tPendingFn\x12\x16.google.protobuf.Empty\x1a\x1a.source.v1.PendingResponse\x12\x45\n\x0cPartitionsFn\x12\x16.google.protobuf.Empty\x1a\x1d.source.v1.PartitionsResponse\x12;\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x18.source.v1.ReadyResponseb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -36,52 +37,52 @@ DESCRIPTOR._loaded_options = None _globals['_READRESPONSE_RESULT_HEADERSENTRY']._loaded_options = None _globals['_READRESPONSE_RESULT_HEADERSENTRY']._serialized_options = b'8\001' - _globals['_HANDSHAKE']._serialized_start=154 - _globals['_HANDSHAKE']._serialized_end=178 - _globals['_READREQUEST']._serialized_start=181 - _globals['_READREQUEST']._serialized_end=358 - _globals['_READREQUEST_REQUEST']._serialized_start=291 - _globals['_READREQUEST_REQUEST']._serialized_end=344 - _globals['_READRESPONSE']._serialized_start=361 - _globals['_READRESPONSE']._serialized_end=1038 - _globals['_READRESPONSE_RESULT']._serialized_start=520 - _globals['_READRESPONSE_RESULT']._serialized_end=788 - _globals['_READRESPONSE_RESULT_HEADERSENTRY']._serialized_start=742 - _globals['_READRESPONSE_RESULT_HEADERSENTRY']._serialized_end=788 - _globals['_READRESPONSE_STATUS']._serialized_start=791 - _globals['_READRESPONSE_STATUS']._serialized_end=1024 - _globals['_READRESPONSE_STATUS_CODE']._serialized_start=941 - _globals['_READRESPONSE_STATUS_CODE']._serialized_end=973 - _globals['_READRESPONSE_STATUS_ERROR']._serialized_start=975 - _globals['_READRESPONSE_STATUS_ERROR']._serialized_end=1006 - _globals['_ACKREQUEST']._serialized_start=1041 - _globals['_ACKREQUEST']._serialized_end=1208 - _globals['_ACKREQUEST_REQUEST']._serialized_start=1149 - _globals['_ACKREQUEST_REQUEST']._serialized_end=1194 - _globals['_ACKRESPONSE']._serialized_start=1211 - _globals['_ACKRESPONSE']._serialized_end=1382 - _globals['_ACKRESPONSE_RESULT']._serialized_start=1319 - _globals['_ACKRESPONSE_RESULT']._serialized_end=1368 - _globals['_NACKREQUEST']._serialized_start=1384 - _globals['_NACKREQUEST']._serialized_end=1493 - _globals['_NACKREQUEST_REQUEST']._serialized_start=1149 - _globals['_NACKREQUEST_REQUEST']._serialized_end=1194 - _globals['_NACKRESPONSE']._serialized_start=1495 - _globals['_NACKRESPONSE']._serialized_end=1608 - _globals['_NACKRESPONSE_RESULT']._serialized_start=1319 - _globals['_NACKRESPONSE_RESULT']._serialized_end=1368 - _globals['_READYRESPONSE']._serialized_start=1610 - _globals['_READYRESPONSE']._serialized_end=1640 - _globals['_PENDINGRESPONSE']._serialized_start=1642 - _globals['_PENDINGRESPONSE']._serialized_end=1735 - _globals['_PENDINGRESPONSE_RESULT']._serialized_start=1712 - _globals['_PENDINGRESPONSE_RESULT']._serialized_end=1735 - _globals['_PARTITIONSRESPONSE']._serialized_start=1738 - _globals['_PARTITIONSRESPONSE']._serialized_end=1894 - _globals['_PARTITIONSRESPONSE_RESULT']._serialized_start=1814 - _globals['_PARTITIONSRESPONSE_RESULT']._serialized_end=1894 - _globals['_OFFSET']._serialized_start=1896 - _globals['_OFFSET']._serialized_end=1942 - _globals['_SOURCE']._serialized_start=1945 - _globals['_SOURCE']._serialized_end=2332 + _globals['_HANDSHAKE']._serialized_start=198 + _globals['_HANDSHAKE']._serialized_end=222 + _globals['_READREQUEST']._serialized_start=225 + _globals['_READREQUEST']._serialized_end=402 + _globals['_READREQUEST_REQUEST']._serialized_start=335 + _globals['_READREQUEST_REQUEST']._serialized_end=388 + _globals['_READRESPONSE']._serialized_start=405 + _globals['_READRESPONSE']._serialized_end=1082 + _globals['_READRESPONSE_RESULT']._serialized_start=564 + _globals['_READRESPONSE_RESULT']._serialized_end=832 + _globals['_READRESPONSE_RESULT_HEADERSENTRY']._serialized_start=786 + _globals['_READRESPONSE_RESULT_HEADERSENTRY']._serialized_end=832 + _globals['_READRESPONSE_STATUS']._serialized_start=835 + _globals['_READRESPONSE_STATUS']._serialized_end=1068 + _globals['_READRESPONSE_STATUS_CODE']._serialized_start=985 + _globals['_READRESPONSE_STATUS_CODE']._serialized_end=1017 + _globals['_READRESPONSE_STATUS_ERROR']._serialized_start=1019 + _globals['_READRESPONSE_STATUS_ERROR']._serialized_end=1050 + _globals['_ACKREQUEST']._serialized_start=1085 + _globals['_ACKREQUEST']._serialized_end=1252 + _globals['_ACKREQUEST_REQUEST']._serialized_start=1193 + _globals['_ACKREQUEST_REQUEST']._serialized_end=1238 + _globals['_ACKRESPONSE']._serialized_start=1255 + _globals['_ACKRESPONSE']._serialized_end=1426 + _globals['_ACKRESPONSE_RESULT']._serialized_start=1363 + _globals['_ACKRESPONSE_RESULT']._serialized_end=1412 + _globals['_NACKREQUEST']._serialized_start=1429 + _globals['_NACKREQUEST']._serialized_end=1603 + _globals['_NACKREQUEST_REQUEST']._serialized_start=1493 + _globals['_NACKREQUEST_REQUEST']._serialized_end=1603 + _globals['_NACKRESPONSE']._serialized_start=1605 + _globals['_NACKRESPONSE']._serialized_end=1718 + _globals['_NACKRESPONSE_RESULT']._serialized_start=1363 + _globals['_NACKRESPONSE_RESULT']._serialized_end=1412 + _globals['_READYRESPONSE']._serialized_start=1720 + _globals['_READYRESPONSE']._serialized_end=1750 + _globals['_PENDINGRESPONSE']._serialized_start=1752 + _globals['_PENDINGRESPONSE']._serialized_end=1845 + _globals['_PENDINGRESPONSE_RESULT']._serialized_start=1822 + _globals['_PENDINGRESPONSE_RESULT']._serialized_end=1845 + _globals['_PARTITIONSRESPONSE']._serialized_start=1848 + _globals['_PARTITIONSRESPONSE']._serialized_end=2004 + _globals['_PARTITIONSRESPONSE_RESULT']._serialized_start=1924 + _globals['_PARTITIONSRESPONSE_RESULT']._serialized_end=2004 + _globals['_OFFSET']._serialized_start=2006 + _globals['_OFFSET']._serialized_end=2052 + _globals['_SOURCE']._serialized_start=2055 + _globals['_SOURCE']._serialized_end=2442 # @@protoc_insertion_point(module_scope) diff --git a/packages/pynumaflow/pynumaflow/proto/sourcer/source_pb2.pyi b/packages/pynumaflow/pynumaflow/proto/sourcer/source_pb2.pyi index c347cfc5..b280f0fb 100644 --- a/packages/pynumaflow/pynumaflow/proto/sourcer/source_pb2.pyi +++ b/packages/pynumaflow/pynumaflow/proto/sourcer/source_pb2.pyi @@ -3,6 +3,7 @@ import datetime from google.protobuf import timestamp_pb2 as _timestamp_pb2 from google.protobuf import empty_pb2 as _empty_pb2 from pynumaflow.proto.common import metadata_pb2 as _metadata_pb2 +from pynumaflow.proto.common import nack_options_pb2 as _nack_options_pb2 from google.protobuf.internal import containers as _containers from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper from google.protobuf import descriptor as _descriptor @@ -117,10 +118,12 @@ class AckResponse(_message.Message): class NackRequest(_message.Message): __slots__ = ("request",) class Request(_message.Message): - __slots__ = ("offsets",) + __slots__ = ("offsets", "nack_options") OFFSETS_FIELD_NUMBER: _ClassVar[int] + NACK_OPTIONS_FIELD_NUMBER: _ClassVar[int] offsets: _containers.RepeatedCompositeFieldContainer[Offset] - def __init__(self, offsets: _Optional[_Iterable[_Union[Offset, _Mapping]]] = ...) -> None: ... + nack_options: _nack_options_pb2.NackOptions + def __init__(self, offsets: _Optional[_Iterable[_Union[Offset, _Mapping]]] = ..., nack_options: _Optional[_Union[_nack_options_pb2.NackOptions, _Mapping]] = ...) -> None: ... REQUEST_FIELD_NUMBER: _ClassVar[int] request: NackRequest.Request def __init__(self, request: _Optional[_Union[NackRequest.Request, _Mapping]] = ...) -> None: ... diff --git a/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform.proto b/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform.proto index fb346167..063d4240 100644 --- a/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform.proto +++ b/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform.proto @@ -3,6 +3,7 @@ syntax = "proto3"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; import "pynumaflow/proto/common/metadata.proto"; +import "pynumaflow/proto/common/nack_options.proto"; package sourcetransformer.v1; @@ -54,6 +55,7 @@ message SourceTransformResponse { repeated string tags = 4; // metadata of the message common.Metadata metadata = 5; + optional common.NackOptions nack_options = 6; } repeated Result results = 1; // This ID is used to refer the responses to the request it corresponds to. diff --git a/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform_pb2.py b/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform_pb2.py index 60077698..1cd00d5b 100644 --- a/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform_pb2.py +++ b/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform_pb2.py @@ -25,9 +25,10 @@ from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 from pynumaflow.proto.common import metadata_pb2 as pynumaflow_dot_proto_dot_common_dot_metadata__pb2 +from pynumaflow.proto.common import nack_options_pb2 as pynumaflow_dot_proto_dot_common_dot_nack__options__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n2pynumaflow/proto/sourcetransformer/transform.proto\x12\x14sourcetransformer.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a&pynumaflow/proto/common/metadata.proto\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\xe2\x03\n\x16SourceTransformRequest\x12\x45\n\x07request\x18\x01 \x01(\x0b\x32\x34.sourcetransformer.v1.SourceTransformRequest.Request\x12\x37\n\thandshake\x18\x02 \x01(\x0b\x32\x1f.sourcetransformer.v1.HandshakeH\x00\x88\x01\x01\x1a\xb9\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12R\n\x07headers\x18\x05 \x03(\x0b\x32\x41.sourcetransformer.v1.SourceTransformRequest.Request.HeadersEntry\x12\n\n\x02id\x18\x06 \x01(\t\x12\"\n\x08metadata\x18\x07 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake\"\xbd\x02\n\x17SourceTransformResponse\x12\x45\n\x07results\x18\x01 \x03(\x0b\x32\x34.sourcetransformer.v1.SourceTransformResponse.Result\x12\n\n\x02id\x18\x02 \x01(\t\x12\x37\n\thandshake\x18\x03 \x01(\x0b\x32\x1f.sourcetransformer.v1.HandshakeH\x00\x88\x01\x01\x1a\x87\x01\n\x06Result\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04tags\x18\x04 \x03(\t\x12\"\n\x08metadata\x18\x05 \x01(\x0b\x32\x10.common.MetadataB\x0c\n\n_handshake\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\x32\xcf\x01\n\x0fSourceTransform\x12t\n\x11SourceTransformFn\x12,.sourcetransformer.v1.SourceTransformRequest\x1a-.sourcetransformer.v1.SourceTransformResponse(\x01\x30\x01\x12\x46\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a#.sourcetransformer.v1.ReadyResponseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n2pynumaflow/proto/sourcetransformer/transform.proto\x12\x14sourcetransformer.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a&pynumaflow/proto/common/metadata.proto\x1a*pynumaflow/proto/common/nack_options.proto\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\xe2\x03\n\x16SourceTransformRequest\x12\x45\n\x07request\x18\x01 \x01(\x0b\x32\x34.sourcetransformer.v1.SourceTransformRequest.Request\x12\x37\n\thandshake\x18\x02 \x01(\x0b\x32\x1f.sourcetransformer.v1.HandshakeH\x00\x88\x01\x01\x1a\xb9\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12R\n\x07headers\x18\x05 \x03(\x0b\x32\x41.sourcetransformer.v1.SourceTransformRequest.Request.HeadersEntry\x12\n\n\x02id\x18\x06 \x01(\t\x12\"\n\x08metadata\x18\x07 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake\"\xfe\x02\n\x17SourceTransformResponse\x12\x45\n\x07results\x18\x01 \x03(\x0b\x32\x34.sourcetransformer.v1.SourceTransformResponse.Result\x12\n\n\x02id\x18\x02 \x01(\t\x12\x37\n\thandshake\x18\x03 \x01(\x0b\x32\x1f.sourcetransformer.v1.HandshakeH\x00\x88\x01\x01\x1a\xc8\x01\n\x06Result\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04tags\x18\x04 \x03(\t\x12\"\n\x08metadata\x18\x05 \x01(\x0b\x32\x10.common.Metadata\x12.\n\x0cnack_options\x18\x06 \x01(\x0b\x32\x13.common.NackOptionsH\x00\x88\x01\x01\x42\x0f\n\r_nack_optionsB\x0c\n\n_handshake\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\x32\xcf\x01\n\x0fSourceTransform\x12t\n\x11SourceTransformFn\x12,.sourcetransformer.v1.SourceTransformRequest\x1a-.sourcetransformer.v1.SourceTransformResponse(\x01\x30\x01\x12\x46\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a#.sourcetransformer.v1.ReadyResponseb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -36,20 +37,20 @@ DESCRIPTOR._loaded_options = None _globals['_SOURCETRANSFORMREQUEST_REQUEST_HEADERSENTRY']._loaded_options = None _globals['_SOURCETRANSFORMREQUEST_REQUEST_HEADERSENTRY']._serialized_options = b'8\001' - _globals['_HANDSHAKE']._serialized_start=178 - _globals['_HANDSHAKE']._serialized_end=202 - _globals['_SOURCETRANSFORMREQUEST']._serialized_start=205 - _globals['_SOURCETRANSFORMREQUEST']._serialized_end=687 - _globals['_SOURCETRANSFORMREQUEST_REQUEST']._serialized_start=360 - _globals['_SOURCETRANSFORMREQUEST_REQUEST']._serialized_end=673 - _globals['_SOURCETRANSFORMREQUEST_REQUEST_HEADERSENTRY']._serialized_start=627 - _globals['_SOURCETRANSFORMREQUEST_REQUEST_HEADERSENTRY']._serialized_end=673 - _globals['_SOURCETRANSFORMRESPONSE']._serialized_start=690 - _globals['_SOURCETRANSFORMRESPONSE']._serialized_end=1007 - _globals['_SOURCETRANSFORMRESPONSE_RESULT']._serialized_start=858 - _globals['_SOURCETRANSFORMRESPONSE_RESULT']._serialized_end=993 - _globals['_READYRESPONSE']._serialized_start=1009 - _globals['_READYRESPONSE']._serialized_end=1039 - _globals['_SOURCETRANSFORM']._serialized_start=1042 - _globals['_SOURCETRANSFORM']._serialized_end=1249 + _globals['_HANDSHAKE']._serialized_start=222 + _globals['_HANDSHAKE']._serialized_end=246 + _globals['_SOURCETRANSFORMREQUEST']._serialized_start=249 + _globals['_SOURCETRANSFORMREQUEST']._serialized_end=731 + _globals['_SOURCETRANSFORMREQUEST_REQUEST']._serialized_start=404 + _globals['_SOURCETRANSFORMREQUEST_REQUEST']._serialized_end=717 + _globals['_SOURCETRANSFORMREQUEST_REQUEST_HEADERSENTRY']._serialized_start=671 + _globals['_SOURCETRANSFORMREQUEST_REQUEST_HEADERSENTRY']._serialized_end=717 + _globals['_SOURCETRANSFORMRESPONSE']._serialized_start=734 + _globals['_SOURCETRANSFORMRESPONSE']._serialized_end=1116 + _globals['_SOURCETRANSFORMRESPONSE_RESULT']._serialized_start=902 + _globals['_SOURCETRANSFORMRESPONSE_RESULT']._serialized_end=1102 + _globals['_READYRESPONSE']._serialized_start=1118 + _globals['_READYRESPONSE']._serialized_end=1148 + _globals['_SOURCETRANSFORM']._serialized_start=1151 + _globals['_SOURCETRANSFORM']._serialized_end=1358 # @@protoc_insertion_point(module_scope) diff --git a/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform_pb2.pyi b/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform_pb2.pyi index 99b452a6..8fe20dc0 100644 --- a/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform_pb2.pyi +++ b/packages/pynumaflow/pynumaflow/proto/sourcetransformer/transform_pb2.pyi @@ -3,6 +3,7 @@ import datetime from google.protobuf import timestamp_pb2 as _timestamp_pb2 from google.protobuf import empty_pb2 as _empty_pb2 from pynumaflow.proto.common import metadata_pb2 as _metadata_pb2 +from pynumaflow.proto.common import nack_options_pb2 as _nack_options_pb2 from google.protobuf.internal import containers as _containers from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message @@ -52,18 +53,20 @@ class SourceTransformRequest(_message.Message): class SourceTransformResponse(_message.Message): __slots__ = ("results", "id", "handshake") class Result(_message.Message): - __slots__ = ("keys", "value", "event_time", "tags", "metadata") + __slots__ = ("keys", "value", "event_time", "tags", "metadata", "nack_options") KEYS_FIELD_NUMBER: _ClassVar[int] VALUE_FIELD_NUMBER: _ClassVar[int] EVENT_TIME_FIELD_NUMBER: _ClassVar[int] TAGS_FIELD_NUMBER: _ClassVar[int] METADATA_FIELD_NUMBER: _ClassVar[int] + NACK_OPTIONS_FIELD_NUMBER: _ClassVar[int] keys: _containers.RepeatedScalarFieldContainer[str] value: bytes event_time: _timestamp_pb2.Timestamp tags: _containers.RepeatedScalarFieldContainer[str] metadata: _metadata_pb2.Metadata - def __init__(self, keys: _Optional[_Iterable[str]] = ..., value: _Optional[bytes] = ..., event_time: _Optional[_Union[datetime.datetime, _timestamp_pb2.Timestamp, _Mapping]] = ..., tags: _Optional[_Iterable[str]] = ..., metadata: _Optional[_Union[_metadata_pb2.Metadata, _Mapping]] = ...) -> None: ... + nack_options: _nack_options_pb2.NackOptions + def __init__(self, keys: _Optional[_Iterable[str]] = ..., value: _Optional[bytes] = ..., event_time: _Optional[_Union[datetime.datetime, _timestamp_pb2.Timestamp, _Mapping]] = ..., tags: _Optional[_Iterable[str]] = ..., metadata: _Optional[_Union[_metadata_pb2.Metadata, _Mapping]] = ..., nack_options: _Optional[_Union[_nack_options_pb2.NackOptions, _Mapping]] = ...) -> None: ... RESULTS_FIELD_NUMBER: _ClassVar[int] ID_FIELD_NUMBER: _ClassVar[int] HANDSHAKE_FIELD_NUMBER: _ClassVar[int] diff --git a/packages/pynumaflow/pynumaflow/sinker/__init__.py b/packages/pynumaflow/pynumaflow/sinker/__init__.py index 5a67f752..8774d6be 100644 --- a/packages/pynumaflow/pynumaflow/sinker/__init__.py +++ b/packages/pynumaflow/pynumaflow/sinker/__init__.py @@ -4,6 +4,7 @@ from pynumaflow._metadata import UserMetadata, SystemMetadata from pynumaflow.sinker._dtypes import Response, Responses, Datum, Sinker, Message +from pynumaflow._nack import NackOptions __all__ = [ "Message", @@ -15,4 +16,5 @@ "SinkServer", "UserMetadata", "SystemMetadata", + "NackOptions", ] diff --git a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py index 656fed7f..339c5778 100644 --- a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py @@ -7,6 +7,7 @@ from warnings import warn from pynumaflow._metadata import SystemMetadata, UserMetadata +from pynumaflow._nack import NackOptions from pynumaflow._validate import _validate_message_fields R = TypeVar("R", bound="Response") @@ -83,15 +84,18 @@ class Response: fallback: bool on_success: bool on_success_msg: Message | None + nack: bool + nack_options: NackOptions | None - __slots__ = ("id", "success", "err", "fallback", "on_success", "on_success_msg") + __slots__ = ("id", "success", "err", "fallback", "on_success", "on_success_msg", "nack", "nack_options") # as_success creates a successful Response with the given id. # The Success field is set to true. @classmethod def as_success(cls, id_: str) -> "Response": return Response( - id=id_, success=True, err=None, fallback=False, on_success=False, on_success_msg=None + id=id_, success=True, err=None, fallback=False, on_success=False, on_success_msg=None, nack=False, + nack_options=None, ) # as_failure creates a failed Response with the given id and error message. @@ -105,6 +109,8 @@ def as_failure(cls, id_: str, err_msg: str) -> "Response": fallback=False, on_success=False, on_success_msg=None, + nack=False, + nack_options=None, ) # as_fallback creates a Response with the fallback field set to true. @@ -112,7 +118,8 @@ def as_failure(cls, id_: str, err_msg: str) -> "Response": @classmethod def as_fallback(cls, id_: str) -> "Response": return Response( - id=id_, fallback=True, err=None, success=False, on_success=False, on_success_msg=None + id=id_, fallback=True, err=None, success=False, on_success=False, on_success_msg=None, nack=False, + nack_options=None, ) # as_on_success creates a Response with the on_success field set to true. @@ -126,6 +133,21 @@ def as_on_success(cls, id_: str, on_success: Message | None = None) -> "Response success=False, on_success=True, on_success_msg=on_success, + nack=False, + nack_options=None, + ) + + @classmethod + def as_nack(cls, id_: str, opts: NackOptions | None = None) -> "Response": + return Response( + id=id_, + success=False, + err=None, + fallback=False, + on_success=False, + on_success_msg=None, + nack=True, + nack_options=opts, ) diff --git a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py index 92f93da6..9311bea0 100644 --- a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py +++ b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py @@ -1,4 +1,5 @@ from pynumaflow._metadata import _user_and_system_metadata_from_proto +from pynumaflow._nack import _nack_options_to_proto from pynumaflow.proto.sinker import sink_pb2 from pynumaflow.sinker._dtypes import Response, Datum, Responses, Message @@ -38,6 +39,12 @@ def build_sink_response(rspn: Response) -> sink_pb2.SinkResponse.Result: status=sink_pb2.Status.ON_SUCCESS, on_success_msg=build_on_success_message(rspn.on_success_msg), ) + elif rspn.nack: + return sink_pb2.SinkResponse.Result( + id=rid, + status=sink_pb2.Status.NACK, + nack_options=_nack_options_to_proto(rspn.nack_options), + ) else: return sink_pb2.SinkResponse.Result( id=rid, status=sink_pb2.Status.FAILURE, err_msg=rspn.err diff --git a/packages/pynumaflow/pynumaflow/sourcer/__init__.py b/packages/pynumaflow/pynumaflow/sourcer/__init__.py index 73b62735..46023eea 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/__init__.py +++ b/packages/pynumaflow/pynumaflow/sourcer/__init__.py @@ -12,6 +12,7 @@ ) from pynumaflow._metadata import UserMetadata from pynumaflow.sourcer.async_server import SourceAsyncServer +from pynumaflow._nack import NackOptions __all__ = [ "Message", @@ -26,4 +27,5 @@ "SourceAsyncServer", "SourceCallable", "UserMetadata", + "NackOptions", ] diff --git a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py index f9e9bf78..dbe043a5 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py @@ -6,6 +6,7 @@ from typing import TypeAlias from pynumaflow._metadata import UserMetadata +from pynumaflow._nack import NackOptions from pynumaflow._validate import _validate_message_fields from pynumaflow.shared.asynciter import NonBlockingIterator @@ -200,17 +201,26 @@ class NackRequest: ``` """ - __slots__ = ("_offsets",) + __slots__ = ("_offsets", "_nack_options") _offsets: list[Offset] + _nack_options: NackOptions | None - def __init__(self, offsets: list[Offset]): + def __init__( + self, + offsets: list[Offset], + nack_options: NackOptions | None = None, + ): self._offsets = offsets + self._nack_options = nack_options @property def offsets(self) -> list[Offset]: - """Returns the offsets to be negatively acknowledged.""" return self._offsets + @property + def nack_options(self) -> NackOptions | None: + return self._nack_options + @dataclass(init=False, slots=True) class PendingResponse: diff --git a/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py index 77ca94cc..8f204137 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py @@ -12,6 +12,7 @@ from pynumaflow.sourcer._dtypes import Message from pynumaflow.types import NumaflowServicerContext from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING +from pynumaflow._nack import _nack_options_from_proto def _create_read_handshake_response(): @@ -219,7 +220,12 @@ async def NackFn( offsets = [ Offset(offset.offset, offset.partition_id) for offset in request.request.offsets ] - await self.__source_nack_handler(NackRequest(offsets=offsets)) + opts = ( + _nack_options_from_proto(request.request.nack_options) + if request.request.HasField("nack_options") + else None + ) + await self.__source_nack_handler(NackRequest(offsets=offsets, nack_options=opts)) except asyncio.CancelledError: # Task cancelled during shutdown (e.g. SIGTERM) — not a UDF fault. _LOGGER.info("Server shutting down, cancelling RPC.") diff --git a/packages/pynumaflow/pynumaflow/sourcetransformer/__init__.py b/packages/pynumaflow/pynumaflow/sourcetransformer/__init__.py index f029c6ef..d6dccf73 100644 --- a/packages/pynumaflow/pynumaflow/sourcetransformer/__init__.py +++ b/packages/pynumaflow/pynumaflow/sourcetransformer/__init__.py @@ -9,6 +9,7 @@ from pynumaflow.sourcetransformer.server import SourceTransformServer from pynumaflow.sourcetransformer.async_server import SourceTransformAsyncServer from pynumaflow._metadata import UserMetadata, SystemMetadata +from pynumaflow._nack import NackOptions __all__ = [ "Message", @@ -21,4 +22,5 @@ "SourceTransformAsyncServer", "UserMetadata", "SystemMetadata", + "NackOptions", ] diff --git a/packages/pynumaflow/pynumaflow/sourcetransformer/_dtypes.py b/packages/pynumaflow/pynumaflow/sourcetransformer/_dtypes.py index 66ffb195..78e1398e 100644 --- a/packages/pynumaflow/pynumaflow/sourcetransformer/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sourcetransformer/_dtypes.py @@ -6,7 +6,8 @@ from collections.abc import Awaitable, Callable from warnings import warn -from pynumaflow._constants import DROP +from pynumaflow._constants import DROP, NACK +from pynumaflow._nack import NackOptions from pynumaflow._metadata import UserMetadata, SystemMetadata from pynumaflow._validate import _validate_message_fields @@ -32,6 +33,7 @@ class Message: _value: bytes _event_time: datetime _user_metadata: UserMetadata + _nack_options: NackOptions | None def __init__( self, @@ -52,11 +54,26 @@ def __init__( self._event_time = event_time or datetime(1, 1, 1, 0, 0) self._value = value or b"" self._user_metadata = user_metadata or UserMetadata() + self._nack_options = None @classmethod def to_drop(cls: type[M], event_time: datetime) -> M: return cls(b"", event_time, None, [DROP]) + @classmethod + def to_nack( + cls: type[M], + event_time: datetime, + opts: NackOptions | None = None, + ) -> M: + m = cls(b"", event_time, None, [NACK]) + m._nack_options = opts + return m + + @property + def nack_options(self) -> NackOptions | None: + return self._nack_options + @property def event_time(self) -> datetime: return self._event_time diff --git a/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_async_servicer.py b/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_async_servicer.py index 819c27c3..e247cc31 100644 --- a/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_async_servicer.py +++ b/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_async_servicer.py @@ -6,6 +6,7 @@ from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING from pynumaflow._metadata import _user_and_system_metadata_from_proto +from pynumaflow._nack import _nack_options_to_proto from pynumaflow.proto.sourcetransformer import transform_pb2, transform_pb2_grpc from pynumaflow.shared.asynciter import NonBlockingIterator from pynumaflow.shared.server import update_context_err @@ -154,6 +155,7 @@ async def _invoke_transform( tags=msg.tags, event_time=event_time_timestamp, metadata=msg.user_metadata._to_proto(), + nack_options=_nack_options_to_proto(msg.nack_options), ) ) await result_queue.put( diff --git a/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_servicer.py b/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_servicer.py index 3945e194..513360b8 100644 --- a/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_servicer.py +++ b/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_servicer.py @@ -6,6 +6,7 @@ from google.protobuf import empty_pb2 as _empty_pb2 from google.protobuf import timestamp_pb2 as _timestamp_pb2 +from pynumaflow._nack import _nack_options_to_proto from pynumaflow.shared.server import update_context_err from pynumaflow.shared.synciter import SyncIterator from pynumaflow.sourcetransformer import Datum @@ -174,6 +175,7 @@ def _invoke_transformer( tags=resp.tags, event_time=event_time_timestamp, metadata=resp.user_metadata._to_proto(), + nack_options=_nack_options_to_proto(msg.nack_options), ) ) result_queue.put( From b781a19c9e395d7f5200782c8829fdca7466359d Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Wed, 1 Jul 2026 13:04:11 -0700 Subject: [PATCH 2/4] fix lint Signed-off-by: Abdullah Yildirim --- packages/pynumaflow/pynumaflow/_nack.py | 2 +- .../batchmapper/servicer/async_servicer.py | 5 +++- .../mapstreamer/servicer/async_servicer.py | 7 ++++- .../pynumaflow/pynumaflow/sinker/_dtypes.py | 27 ++++++++++++++++--- .../pynumaflow/pynumaflow/sourcer/_dtypes.py | 6 ++--- .../pynumaflow/sourcetransformer/_dtypes.py | 6 ++--- .../sourcetransformer/servicer/_servicer.py | 2 +- 7 files changed, 42 insertions(+), 13 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/_nack.py b/packages/pynumaflow/pynumaflow/_nack.py index e5c2ba59..64a7418d 100644 --- a/packages/pynumaflow/pynumaflow/_nack.py +++ b/packages/pynumaflow/pynumaflow/_nack.py @@ -36,4 +36,4 @@ def _nack_options_from_proto( delay=proto.delay if proto.HasField("delay") else None, max_deliveries=proto.max_deliveries if proto.HasField("max_deliveries") else None, reason=proto.reason if proto.HasField("reason") else None, - ) \ No newline at end of file + ) diff --git a/packages/pynumaflow/pynumaflow/batchmapper/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/batchmapper/servicer/async_servicer.py index 5706cc1c..815276f7 100644 --- a/packages/pynumaflow/pynumaflow/batchmapper/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/batchmapper/servicer/async_servicer.py @@ -81,7 +81,10 @@ async def MapFn( for msg in batch_response.messages: single_req_resp.append( map_pb2.MapResponse.Result( - keys=msg.keys, value=msg.value, tags=msg.tags, nack_options=_nack_options_to_proto(msg.nack_options) + keys=msg.keys, + value=msg.value, + tags=msg.tags, + nack_options=_nack_options_to_proto(msg.nack_options), ) ) # send the response for a given ID back to the stream diff --git a/packages/pynumaflow/pynumaflow/mapstreamer/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/mapstreamer/servicer/async_servicer.py index e70c3126..97ac2980 100644 --- a/packages/pynumaflow/pynumaflow/mapstreamer/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/mapstreamer/servicer/async_servicer.py @@ -159,7 +159,12 @@ async def _invoke_map_stream( # same time in the next vertex instead of in a true streaming fashion. # The asyncio.sleep(0) will yield the control back to event loop avoiding starvation. async for msg in self.__map_stream_handler(list(req.request.keys), datum): - res = map_pb2.MapResponse.Result(keys=msg.keys, value=msg.value, tags=msg.tags, nack_options=_nack_options_to_proto(msg.nack_options)) + res = map_pb2.MapResponse.Result( + keys=msg.keys, + value=msg.value, + tags=msg.tags, + nack_options=_nack_options_to_proto(msg.nack_options), + ) await result_queue.put(map_pb2.MapResponse(results=[res], id=req.id)) await asyncio.sleep(0) diff --git a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py index 339c5778..afada023 100644 --- a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py @@ -87,14 +87,29 @@ class Response: nack: bool nack_options: NackOptions | None - __slots__ = ("id", "success", "err", "fallback", "on_success", "on_success_msg", "nack", "nack_options") + __slots__ = ( + "id", + "success", + "err", + "fallback", + "on_success", + "on_success_msg", + "nack", + "nack_options", + ) # as_success creates a successful Response with the given id. # The Success field is set to true. @classmethod def as_success(cls, id_: str) -> "Response": return Response( - id=id_, success=True, err=None, fallback=False, on_success=False, on_success_msg=None, nack=False, + id=id_, + success=True, + err=None, + fallback=False, + on_success=False, + on_success_msg=None, + nack=False, nack_options=None, ) @@ -118,7 +133,13 @@ def as_failure(cls, id_: str, err_msg: str) -> "Response": @classmethod def as_fallback(cls, id_: str) -> "Response": return Response( - id=id_, fallback=True, err=None, success=False, on_success=False, on_success_msg=None, nack=False, + id=id_, + fallback=True, + err=None, + success=False, + on_success=False, + on_success_msg=None, + nack=False, nack_options=None, ) diff --git a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py index dbe043a5..3b333af5 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py @@ -206,9 +206,9 @@ class NackRequest: _nack_options: NackOptions | None def __init__( - self, - offsets: list[Offset], - nack_options: NackOptions | None = None, + self, + offsets: list[Offset], + nack_options: NackOptions | None = None, ): self._offsets = offsets self._nack_options = nack_options diff --git a/packages/pynumaflow/pynumaflow/sourcetransformer/_dtypes.py b/packages/pynumaflow/pynumaflow/sourcetransformer/_dtypes.py index 78e1398e..b9b2f07f 100644 --- a/packages/pynumaflow/pynumaflow/sourcetransformer/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sourcetransformer/_dtypes.py @@ -62,9 +62,9 @@ def to_drop(cls: type[M], event_time: datetime) -> M: @classmethod def to_nack( - cls: type[M], - event_time: datetime, - opts: NackOptions | None = None, + cls: type[M], + event_time: datetime, + opts: NackOptions | None = None, ) -> M: m = cls(b"", event_time, None, [NACK]) m._nack_options = opts diff --git a/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_servicer.py b/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_servicer.py index 513360b8..214f6ec0 100644 --- a/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_servicer.py +++ b/packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_servicer.py @@ -175,7 +175,7 @@ def _invoke_transformer( tags=resp.tags, event_time=event_time_timestamp, metadata=resp.user_metadata._to_proto(), - nack_options=_nack_options_to_proto(msg.nack_options), + nack_options=_nack_options_to_proto(resp.nack_options), ) ) result_queue.put( From 7949dddaa66f6720252a4fb6a0c404e6970d81a4 Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Wed, 1 Jul 2026 13:09:02 -0700 Subject: [PATCH 3/4] fix test Signed-off-by: Abdullah Yildirim --- packages/pynumaflow/tests/sink/test_responses.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/pynumaflow/tests/sink/test_responses.py b/packages/pynumaflow/tests/sink/test_responses.py index 54bc5682..cb37d0d9 100644 --- a/packages/pynumaflow/tests/sink/test_responses.py +++ b/packages/pynumaflow/tests/sink/test_responses.py @@ -49,18 +49,20 @@ def test_responses(): assert ( "[Response(id='2', success=True, err=None, fallback=False, " - "on_success=False, on_success_msg=None), " + "on_success=False, on_success_msg=None, nack=False, nack_options=None), " "Response(id='3', success=False, err='RuntimeError encountered!', " - "fallback=False, on_success=False, on_success_msg=None), " + "fallback=False, on_success=False, on_success_msg=None, nack=False, " + "nack_options=None), " "Response(id='5', success=False, err=None, fallback=True, " - "on_success=False, on_success_msg=None), " + "on_success=False, on_success_msg=None, nack=False, nack_options=None), " "Response(id='4', success=True, err=None, fallback=False, " - "on_success=False, on_success_msg=None), " + "on_success=False, on_success_msg=None, nack=False, nack_options=None), " "Response(id='6', success=False, err=None, fallback=False, " "on_success=True, on_success_msg=Message(_keys=['key'], _value=b'value', " - "_user_metadata=UserMetadata(_data={}))), " + "_user_metadata=UserMetadata(_data={})), nack=False, nack_options=None), " "Response(id='7', success=False, err=None, fallback=False, " - "on_success=True, on_success_msg=None)]" == repr(resps) + "on_success=True, on_success_msg=None, nack=False, nack_options=None)]" + == repr(resps) ) From ada254d8cc979e65592ec0814af52eed228a8898 Mon Sep 17 00:00:00 2001 From: Abdullah Yildirim Date: Wed, 1 Jul 2026 13:09:44 -0700 Subject: [PATCH 4/4] fix lint Signed-off-by: Abdullah Yildirim --- packages/pynumaflow/tests/sink/test_responses.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/pynumaflow/tests/sink/test_responses.py b/packages/pynumaflow/tests/sink/test_responses.py index cb37d0d9..93141f35 100644 --- a/packages/pynumaflow/tests/sink/test_responses.py +++ b/packages/pynumaflow/tests/sink/test_responses.py @@ -61,8 +61,7 @@ def test_responses(): "on_success=True, on_success_msg=Message(_keys=['key'], _value=b'value', " "_user_metadata=UserMetadata(_data={})), nack=False, nack_options=None), " "Response(id='7', success=False, err=None, fallback=False, " - "on_success=True, on_success_msg=None, nack=False, nack_options=None)]" - == repr(resps) + "on_success=True, on_success_msg=None, nack=False, nack_options=None)]" == repr(resps) )