Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sentry_streams/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sentry_streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ extension-module = ["pyo3/extension-module"]
cli = ["dep:clap", "pyo3/auto-initialize"]

[dev-dependencies]
metrics-util = { version = "0.20.1", default-features = false, features = ["debugging"] }
ordered-float = { version = "5", default-features = false }
parking_lot = "0.12.1"
pyo3 = { version = "*", features = ["auto-initialize"] }

Expand Down
36 changes: 25 additions & 11 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def finalize_chain(
chains: TransformChains,
route: Route,
metrics_config: MetricsConfig,
segment_label: str,
) -> RuntimeOperator:
rust_route = RustRoute(route.source, route.waypoints)
config, func = chains.finalize(route)
Expand All @@ -209,6 +210,7 @@ def finalize_chain(
)

return RuntimeOperator.PythonAdapter(
segment_label,
rust_route,
MultiprocessDelegateFactory(
func,
Expand All @@ -226,7 +228,7 @@ def finalize_chain(
)
else:
logger.info(f"Finalizing chain for route {route} without multiprocessing")
return RuntimeOperator.Map(rust_route, lambda msg: func(msg).to_inner())
return RuntimeOperator.Map(segment_label, rust_route, lambda msg: func(msg).to_inner())


class RustArroyoAdapter(StreamAdapter[Route, Route]):
Expand Down Expand Up @@ -259,8 +261,9 @@ def build( # type: ignore[override]
def __close_chain(self, stream: Route) -> None:
if self.__chains.exists(stream):
logger.info(f"Closing transformation chain: {stream} and adding to pipeline")
segment_label = self.__chains.segment_label(stream)
self.__consumers[stream.source].add_step(
finalize_chain(self.__chains, stream, self.__metrics_config)
finalize_chain(self.__chains, stream, self.__metrics_config, segment_label),
)

def get_consumer(self, source: str) -> ArroyoConsumer:
Expand Down Expand Up @@ -332,19 +335,22 @@ def wrapped_generator() -> str:

logger.info(f"Adding GCS sink: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
RuntimeOperator.GCSSink(route, step.bucket, wrapped_generator, step.thread_count)
RuntimeOperator.GCSSink(
step.name, route, step.bucket, wrapped_generator, step.thread_count
),
)

elif isinstance(step, DevNullSink):
logger.info(f"Adding DevNull sink: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
RuntimeOperator.DevNullSink(
step.name,
route,
batch_size=step.batch_size,
batch_time_ms=step.batch_time_ms,
average_sleep_time_ms=step.average_sleep_time_ms,
max_sleep_time_ms=step.max_sleep_time_ms,
)
),
)

# Our fallback for now since there's no other Sink type
Expand All @@ -353,10 +359,11 @@ def wrapped_generator() -> str:
logger.info(f"Adding stream sink: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
RuntimeOperator.StreamSink(
step.name,
route,
step.stream_name,
build_kafka_producer_config(step.name, self.steps_config),
)
),
)

return stream
Expand Down Expand Up @@ -449,7 +456,9 @@ def filter_msg(msg: Message[Any]) -> bool:
route = RustRoute(stream.source, stream.waypoints)
logger.info(f"Adding filter: {step.name} to pipeline")

self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg))
self.__consumers[stream.source].add_step(
RuntimeOperator.Filter(step.name, route, filter_msg)
)

return stream

Expand All @@ -472,7 +481,7 @@ def reduce(
logger.info(f"Adding reduce: {step.name} to pipeline")

self.__consumers[stream.source].add_step(
RuntimeOperator.PythonAdapter(route, ReduceDelegateFactory(step))
RuntimeOperator.PythonAdapter(step.name, route, ReduceDelegateFactory(step)),
)
return stream

Expand All @@ -495,8 +504,10 @@ def broadcast(
logger.info(f"Adding broadcast: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
RuntimeOperator.Broadcast(
route, downstream_routes=[branch.root.name for branch in step.routes]
)
step.name,
route,
downstream_routes=[branch.root.name for branch in step.routes],
),
)
return build_branches(stream, step.routes)

Expand Down Expand Up @@ -534,8 +545,11 @@ def routing_function(msg: Message[Any]) -> str:
logger.info(f"Adding router: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
RuntimeOperator.Router(
route, routing_function, cast(Sequence[str], step.routing_table.values())
)
step.name,
route,
routing_function,
cast(Sequence[str], step.routing_table.values()),
),
)
return build_branches(stream, step.routing_table.values())

Expand Down
12 changes: 12 additions & 0 deletions sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ def add_map(self, route: Route, step: Map[Any, Any]) -> None:
raise ValueError(f"Chain {route} not initialized")
self.__chains[hashable_route].steps.append(step)

def segment_label(self, route: Route) -> str:
"""
Metric label for the chained map segment: the first map step's name, before `finalize`.
"""
hashable_route = _hashable_route(route)
if hashable_route not in self.__chains:
raise ValueError(f"No chain for route {route}")
steps = self.__chains[hashable_route].steps
if not steps:
return route.source
return steps[0].name

def finalize(
self, route: Route
) -> Tuple[MultiProcessConfig | None, Callable[[Message[Any]], Message[Any]]]:
Expand Down
23 changes: 18 additions & 5 deletions sentry_streams/sentry_streams/rust_streams.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,26 @@ class PyMetricConfig:
def flush_interval_ms(self) -> int | None: ...

class RuntimeOperator:
@property
def step_name(self) -> str: ...
@classmethod
def Map(cls, route: Route, function: Callable[[Message[Any]], Any]) -> Self: ...
def Map(cls, step_name: str, route: Route, function: Callable[[Message[Any]], Any]) -> Self: ...
@classmethod
def Filter(cls, route: Route, function: Callable[[Message[Any]], bool]) -> Self: ...
def Filter(
cls, step_name: str, route: Route, function: Callable[[Message[Any]], bool]
) -> Self: ...
@classmethod
def StreamSink(
cls, route: Route, topic_name: str, kafka_config: PyKafkaProducerConfig
cls,
step_name: str,
route: Route,
topic_name: str,
kafka_config: PyKafkaProducerConfig,
) -> Self: ...
@classmethod
def GCSSink(
cls,
step_name: str,
route: Route,
bucket: str,
object_generator: Callable[[], str],
Expand All @@ -78,6 +87,7 @@ class RuntimeOperator:
@classmethod
def DevNullSink(
cls,
step_name: str,
route: Route,
batch_size: int | None = None,
batch_time_ms: float | None = None,
Expand All @@ -87,14 +97,17 @@ class RuntimeOperator:
@classmethod
def Router(
cls,
step_name: str,
route: Route,
function: Callable[[Message[Any]], str],
downstream_routes: Sequence[str],
) -> Self: ...
@classmethod
def Broadcast(cls, route: Route, downstream_routes: Sequence[str]) -> Self: ...
def Broadcast(cls, step_name: str, route: Route, downstream_routes: Sequence[str]) -> Self: ...
@classmethod
def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ...
def PythonAdapter(
cls, step_name: str, route: Route, delegate_Factory: RustOperatorFactory
) -> Self: ...

class ArroyoConsumer:
def __init__(
Expand Down
Loading
Loading