Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/pynumaflow/pynumaflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
STREAM_EOF = "EOF"
DELIMITER = ":"
DROP = "U+005C__DROP__"
NACK = "U+005C__NACK__"

_PROCESS_COUNT = os.cpu_count()
# Cap max value to 16
Expand Down
39 changes: 39 additions & 0 deletions packages/pynumaflow/pynumaflow/_nack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

from dataclasses import dataclass

from pynumaflow.proto.common import nack_options_pb2


@dataclass
class NackOptions:
"""Per-message redelivery options for a nack."""

delay: int | None = None
max_deliveries: int | None = None
reason: str | None = None

def _to_proto(self) -> nack_options_pb2.NackOptions:
return nack_options_pb2.NackOptions(
reason=self.reason,
max_deliveries=self.max_deliveries,
delay=self.delay,
)


def _nack_options_to_proto(
opts: NackOptions | None,
) -> nack_options_pb2.NackOptions | None:
if opts is None:
return None
return opts._to_proto()


def _nack_options_from_proto(
proto: nack_options_pb2.NackOptions,
) -> NackOptions:
return NackOptions(
delay=proto.delay if proto.HasField("delay") else None,
max_deliveries=proto.max_deliveries if proto.HasField("max_deliveries") else None,
reason=proto.reason if proto.HasField("reason") else None,
)
2 changes: 2 additions & 0 deletions packages/pynumaflow/pynumaflow/batchmapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BatchResponse,
)
from pynumaflow.batchmapper.async_server import BatchMapAsyncServer
from pynumaflow._nack import NackOptions

__all__ = [
"Message",
Expand All @@ -17,4 +18,5 @@
"BatchMapper",
"BatchResponses",
"BatchResponse",
"NackOptions",
]
15 changes: 14 additions & 1 deletion packages/pynumaflow/pynumaflow/batchmapper/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from typing import TypeAlias, TypeVar
from collections.abc import AsyncIterable, Callable

from pynumaflow._constants import DROP
from pynumaflow._constants import DROP, NACK
from pynumaflow._nack import NackOptions
from pynumaflow._validate import _validate_message_fields

M = TypeVar("M", bound="Message")
Expand All @@ -27,6 +28,7 @@ class Message:
_value: bytes
_keys: list[str]
_tags: list[str]
_nack_options: NackOptions | None

def __init__(self, value: bytes, keys: list[str] | None = None, tags: list[str] | None = None):
"""
Expand All @@ -36,12 +38,23 @@ def __init__(self, value: bytes, keys: list[str] | None = None, tags: list[str]
self._keys = keys or []
self._tags = tags or []
self._value = value or b""
self._nack_options = None

# returns the Message Object which will be dropped
@classmethod
def to_drop(cls: type[M]) -> M:
return cls(b"", None, [DROP])

@classmethod
def to_nack(cls: type[M], opts: NackOptions | None = None) -> M:
m = cls(b"", None, [NACK])
m._nack_options = opts
return m

@property
def nack_options(self) -> NackOptions | None:
return self._nack_options

@property
def value(self) -> bytes:
return self._value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pynumaflow.shared.server import update_context_err
from pynumaflow.types import NumaflowServicerContext
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING
from pynumaflow._nack import _nack_options_to_proto


class AsyncBatchMapServicer(map_pb2_grpc.MapServicer):
Expand Down Expand Up @@ -80,7 +81,7 @@ async def MapFn(
for msg in batch_response.messages:
single_req_resp.append(
map_pb2.MapResponse.Result(
keys=msg.keys, value=msg.value, tags=msg.tags
keys=msg.keys, value=msg.value, tags=msg.tags, nack_options=_nack_options_to_proto(msg.nack_options)
)
)
# send the response for a given ID back to the stream
Expand Down
2 changes: 2 additions & 0 deletions packages/pynumaflow/pynumaflow/mapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pynumaflow.mapper._dtypes import Message, Messages, Datum, DROP, Mapper
from pynumaflow._metadata import UserMetadata, SystemMetadata
from pynumaflow._nack import NackOptions

__all__ = [
"Message",
Expand All @@ -16,4 +17,5 @@
"MapMultiprocServer",
"UserMetadata",
"SystemMetadata",
"NackOptions",
]
15 changes: 14 additions & 1 deletion packages/pynumaflow/pynumaflow/mapper/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from collections.abc import Callable
from warnings import warn

from pynumaflow._constants import DROP
from pynumaflow._constants import DROP, NACK
from pynumaflow._nack import NackOptions
from pynumaflow._metadata import UserMetadata, SystemMetadata
from pynumaflow._validate import _validate_message_fields

Expand All @@ -30,6 +31,7 @@ class Message:
_keys: list[str]
_tags: list[str]
_user_metadata: UserMetadata
_nack_options: NackOptions | None

def __init__(
self,
Expand All @@ -46,12 +48,23 @@ def __init__(
self._tags = tags or []
self._value = value or b""
self._user_metadata = user_metadata or UserMetadata()
self._nack_options = None

# returns the Message Object which will be dropped
@classmethod
def to_drop(cls: type[M]) -> M:
return cls(b"", None, [DROP])

@classmethod
def to_nack(cls: type[M], opts: NackOptions | None = None) -> M:
m = cls(b"", None, [NACK])
m._nack_options = opts
return m

@property
def nack_options(self) -> NackOptions | None:
return self._nack_options

@property
def value(self) -> bytes:
return self._value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING
from pynumaflow.mapper._dtypes import MapAsyncCallable, Datum, MapError, Message, Messages
from pynumaflow._metadata import _user_and_system_metadata_from_proto
from pynumaflow._nack import _nack_options_to_proto
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
from pynumaflow.shared.server import update_context_err
from pynumaflow.types import NumaflowServicerContext
Expand Down Expand Up @@ -143,6 +144,7 @@ async def _invoke_map(self, req: map_pb2.MapRequest, result_queue: NonBlockingIt
value=msg.value,
tags=msg.tags,
metadata=msg.user_metadata._to_proto(),
nack_options=_nack_options_to_proto(msg.nack_options),
)
)
await result_queue.put(map_pb2.MapResponse(results=datums, id=req.id))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from pynumaflow._constants import NUM_THREADS_DEFAULT, STREAM_EOF, _LOGGER, ERR_UDF_EXCEPTION_STRING
from pynumaflow.mapper._dtypes import MapSyncCallable, Datum, MapError
from pynumaflow._nack import _nack_options_to_proto
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
from pynumaflow.shared.synciter import SyncIterator
from pynumaflow.types import NumaflowServicerContext
Expand Down Expand Up @@ -155,6 +156,7 @@ def _invoke_map(
value=resp.value,
tags=resp.tags,
metadata=resp.user_metadata._to_proto(),
nack_options=_nack_options_to_proto(resp.nack_options),
)
)
result_queue.put(map_pb2.MapResponse(results=results, id=request.id))
Expand Down
2 changes: 2 additions & 0 deletions packages/pynumaflow/pynumaflow/mapstreamer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pynumaflow.mapstreamer._dtypes import Message, Messages, Datum, MapStreamer
from pynumaflow.mapstreamer.async_server import MapStreamAsyncServer
from pynumaflow._nack import NackOptions

__all__ = [
"Message",
Expand All @@ -10,4 +11,5 @@
"DROP",
"MapStreamAsyncServer",
"MapStreamer",
"NackOptions",
]
15 changes: 14 additions & 1 deletion packages/pynumaflow/pynumaflow/mapstreamer/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from collections.abc import AsyncIterable, Callable
from warnings import warn

from pynumaflow._constants import DROP
from pynumaflow._constants import DROP, NACK
from pynumaflow._nack import NackOptions
from pynumaflow._validate import _validate_message_fields

M = TypeVar("M", bound="Message")
Expand All @@ -27,6 +28,7 @@ class Message:
_value: bytes
_keys: list[str]
_tags: list[str]
_nack_options: NackOptions | None

def __init__(self, value: bytes, keys: list[str] | None = None, tags: list[str] | None = None):
"""
Expand All @@ -36,12 +38,23 @@ def __init__(self, value: bytes, keys: list[str] | None = None, tags: list[str]
self._keys = keys or []
self._tags = tags or []
self._value = value or b""
self._nack_options = None

# returns the Message Object which will be dropped
@classmethod
def to_drop(cls: type[M]) -> M:
return cls(b"", None, [DROP])

@classmethod
def to_nack(cls: type[M], opts: NackOptions | None = None) -> M:
m = cls(b"", None, [NACK])
m._nack_options = opts
return m

@property
def nack_options(self) -> NackOptions | None:
return self._nack_options

@property
def value(self) -> bytes:
return self._value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_UDF_EXCEPTION_STRING
from pynumaflow.mapstreamer import Datum
from pynumaflow.mapstreamer._dtypes import MapStreamCallable, MapStreamError
from pynumaflow._nack import _nack_options_to_proto
from pynumaflow.proto.mapper import map_pb2_grpc, map_pb2
from pynumaflow.shared.server import update_context_err
from pynumaflow.types import NumaflowServicerContext
Expand Down Expand Up @@ -158,7 +159,7 @@ async def _invoke_map_stream(
# same time in the next vertex instead of in a true streaming fashion.
# The asyncio.sleep(0) will yield the control back to event loop avoiding starvation.
async for msg in self.__map_stream_handler(list(req.request.keys), datum):
res = map_pb2.MapResponse.Result(keys=msg.keys, value=msg.value, tags=msg.tags)
res = map_pb2.MapResponse.Result(keys=msg.keys, value=msg.value, tags=msg.tags, nack_options=_nack_options_to_proto(msg.nack_options))
await result_queue.put(map_pb2.MapResponse(results=[res], id=req.id))
await asyncio.sleep(0)

Expand Down
13 changes: 13 additions & 0 deletions packages/pynumaflow/pynumaflow/proto/common/nack_options.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package common;

// NackOptions carries per-message redelivery options for a negative acknowledgement (nack).
message NackOptions {
// reason is a human-readable reason for nacking the message.
optional string reason = 1;
// max_deliveries is the maximum number of redelivery attempts.
optional uint32 max_deliveries = 2;
// delay is the redelivery delay in milliseconds.
optional uint64 delay = 3;
}
36 changes: 36 additions & 0 deletions packages/pynumaflow/pynumaflow/proto/common/nack_options_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional

DESCRIPTOR: _descriptor.FileDescriptor

class NackOptions(_message.Message):
__slots__ = ("reason", "max_deliveries", "delay")
REASON_FIELD_NUMBER: _ClassVar[int]
MAX_DELIVERIES_FIELD_NUMBER: _ClassVar[int]
DELAY_FIELD_NUMBER: _ClassVar[int]
reason: str
max_deliveries: int
delay: int
def __init__(self, reason: _Optional[str] = ..., max_deliveries: _Optional[int] = ..., delay: _Optional[int] = ...) -> None: ...
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings


GRPC_GENERATED_VERSION = '1.75.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True

if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in pynumaflow/proto/common/nack_options_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
2 changes: 2 additions & 0 deletions packages/pynumaflow/pynumaflow/proto/mapper/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "pynumaflow/proto/common/metadata.proto";
import "pynumaflow/proto/common/nack_options.proto";

package map.v1;

Expand Down Expand Up @@ -61,6 +62,7 @@ message MapResponse {
repeated string tags = 3;
// metadata of the message
common.Metadata metadata = 4;
optional common.NackOptions nack_options = 5;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
Expand Down
Loading
Loading