Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@ tasks:
--grpc_python_out={{.OUT_DIR}} \
--mypy_grpc_out={{.OUT_DIR}} \
-I proto \
proto/redpanda/runtime/proto/runtime.proto
proto/redpanda/runtime/v1alpha1/*.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,18 @@ syntax = "proto3";

package redpanda.runtime.v1alpha1;

option go_package = "internal/agent/runtimepb";
option go_package = "github.com/redpanda-data/connect/v4/internal/agent/runtimepb";

import "google/protobuf/timestamp.proto";

// `NullValue` is a representation of a null value.
enum NullValue { NULL_VALUE = 0; }

// `StructValue` represents a struct value which can be used to represent a
// structured data value.
message StructValue {
map<string, Value> fields = 1;
}

// `ListValue` represents a list value which can be used to represent a list of
// values.
message ListValue {
repeated Value values = 1;
}

// `Value` represents a dynamically typed value which can be used to represent
// a value passed to an agent.
message Value {
oneof kind {
NullValue null_value = 1;
string string_value = 2;
int64 integer_value = 3;
double double_value = 4;
bool bool_value = 5;
google.protobuf.Timestamp timestamp_value = 6;
bytes bytes_value = 7;
StructValue struct_value = 8;
ListValue list_value = 9;
}
}

// Message represents a piece of structured data that flows through the runtime.
message Message {
oneof payload {
bytes serialized = 1;
Value structured = 2;
}
StructValue metadata = 3;
}
import "redpanda/runtime/v1alpha1/message.proto";

message TraceContext {
string trace_id = 1;
string span_id = 2;
string trace_flags = 4;
}

message Trace {
repeated Span spans = 1;
}
message Trace { repeated Span spans = 1; }

message Span {
string span_id = 1;
Expand All @@ -93,7 +52,7 @@ message InvokeAgentResponse {
Trace trace = 2;
}

// `Runtime` is the service that provides the ability to invoke an agent.
service Runtime {
// `AgentRuntime` is the service that provides the ability to invoke an agent.
service AgentRuntime {
rpc InvokeAgent(InvokeAgentRequest) returns (InvokeAgentResponse);
}
94 changes: 94 additions & 0 deletions proto/redpanda/runtime/v1alpha1/message.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2025 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package redpanda.runtime.v1alpha1;

option go_package = "github.com/redpanda-data/connect/v4/internal/rpcplugin/runtimepb";

import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";

// `NullValue` is a representation of a null value.
enum NullValue {
NULL_VALUE = 0;
}

// `StructValue` represents a struct value which can be used to represent a
// structured data value.
message StructValue { map<string, Value> fields = 1; }

// `ListValue` represents a list value which can be used to represent a list of
// values.
message ListValue { repeated Value values = 1; }

// `Value` represents a dynamically typed value which can be used to represent
// a value within a Redpanda Connect pipeline.
message Value {
oneof kind {
NullValue null_value = 1;
string string_value = 2;
int64 integer_value = 3;
double double_value = 4;
bool bool_value = 5;
google.protobuf.Timestamp timestamp_value = 6;
bytes bytes_value = 7;
StructValue struct_value = 8;
ListValue list_value = 9;
}
}

// An error in the context of a data pipeline.
message Error {
// The error message. If non empty, then the error to be "valid" and
// if empty the error is ignored as if a success (due to proto3 empty
// semantics).
string message = 1;
// NotConnected is returned by inputs and outputs when their Read or
// Write methods are called and the connection that they maintain is lost.
// This error prompts the upstream component to call Connect until the
// connection is re-established.
message NotConnected {}
// EndOfInput is returned by inputs that have exhausted their source of
// data to the point where subsequent Read calls will be ineffective. This
// error prompts the upstream component to gracefully terminate the
// pipeline.
message EndOfInput {}
// Additional error details for specific Redpanda Connect behavior.
// If one of these fields is set, then message must be non-empty.
oneof detail {
// BackOff is an error that plugins can optionally wrap another error with
// which instructs upstream components to wait for a specified period of
// time before retrying the errored call.
//
// Only suppported by Connect methods in the Input and Output services.
google.protobuf.Duration backoff = 2;
NotConnected not_connected = 3;
EndOfInput end_of_input = 4;
}
}

// Message represents a piece of data or an event that flows through the
// runtime.
message Message {
oneof payload {
bytes bytes = 1;
Value structured = 2;
}
StructValue metadata = 3;
Error error = 4;
}

message MessageBatch { repeated Message messages = 1; }
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies = [
"aiohttp>=3.11.16",
"grpcio>=1.71.0",
"grpcio-health-checking>=1.71.0",
"litellm>=1.63.14",
"litellm>=1.70.0",
"mcp>=1.5.0",
"opentelemetry-api>=1.32.1",
"opentelemetry-sdk>=1.32.1",
Expand Down Expand Up @@ -45,7 +45,7 @@ redpanda = { workspace = true }
[tool.ruff]
line-length = 100
target-version = "py39"
exclude = ["proto"]
exclude = ["v1alpha1"]

[tool.ruff.lint]
select = [
Expand Down Expand Up @@ -74,7 +74,7 @@ packages = ["src/redpanda"]

[tool.pyright]
exclude = [
"src/redpanda/runtime/proto/**",
"src/redpanda/runtime/v1alpha1/**",
"**/__pycache__",
"**/.*",
]
Expand Down
18 changes: 11 additions & 7 deletions src/redpanda/runtime/_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
from pydantic import BaseModel

from redpanda.agents import Agent
from redpanda.runtime.proto import runtime_pb2 as pb, runtime_pb2_grpc as grpcpb
from redpanda.runtime.v1alpha1 import (
agent_pb2 as pb,
agent_pb2_grpc as grpcpb,
message_pb2 as msg_pb,
)

from ._otel import convert_spans, current_spans_context_var


def _serialize_payload(payload: pb.Value) -> str:
def _serialize_payload(payload: msg_pb.Value) -> str:
kind = payload.WhichOneof("kind")
if kind == "bool_value":
return "true" if payload.bool_value else "false"
Expand Down Expand Up @@ -67,7 +71,7 @@ def _serialize_payload(payload: pb.Value) -> str:
raise ValueError(f"Unknown payload kind: {kind}")


class RuntimeServer(grpcpb.RuntimeServicer):
class RuntimeServer(grpcpb.AgentRuntimeServicer):
agent: Agent
tracer: trace.Tracer

Expand Down Expand Up @@ -97,16 +101,16 @@ async def InvokeAgent(
if request.message.WhichOneof("payload") == "structured":
payload = _serialize_payload(request.message.structured)
else:
payload = request.message.serialized.decode("utf-8")
payload = request.message.bytes.decode("utf-8")
with self.tracer.start_as_current_span("agent_invoke", context=trace_ctx):
output = await self.agent.run(input=payload)
if isinstance(output, BaseModel):
output = output.model_dump_json()
elif not isinstance(output, str):
output = json.dumps(output)
return pb.InvokeAgentResponse(
message=pb.Message(
serialized=output.encode("utf-8"),
message=msg_pb.Message(
bytes=output.encode("utf-8"),
metadata=request.message.metadata,
),
trace=pb.Trace(spans=convert_spans(spans)) if trace_ctx else None,
Expand All @@ -126,7 +130,7 @@ async def serve_main(runtime_server: RuntimeServer):
health_pb2.HealthCheckResponse.ServingStatus.Value("SERVING"),
)
server = grpc.aio.server()
grpcpb.add_RuntimeServicer_to_server(
grpcpb.add_AgentRuntimeServicer_to_server(
runtime_server,
server,
)
Expand Down
8 changes: 4 additions & 4 deletions src/redpanda/runtime/_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from opentelemetry.sdk.trace import export as traceexport
from opentelemetry.util import types as oteltypes

from redpanda.runtime.proto import runtime_pb2 as pb
from redpanda.runtime.v1alpha1 import agent_pb2 as pb, message_pb2 as msg_pb


def _proto_timestamp_from_time_ns(time_ns: int | None) -> PbTimestamp:
Expand All @@ -33,12 +33,12 @@ def _proto_timestamp_from_time_ns(time_ns: int | None) -> PbTimestamp:
return ts


def _convert_span_attributes(attrs: oteltypes.Attributes) -> dict[str, pb.Value]:
def _convert_span_attributes(attrs: oteltypes.Attributes) -> dict[str, msg_pb.Value]:
if attrs is None:
return {}
pb_attrs: dict[str, pb.Value] = {}
pb_attrs: dict[str, msg_pb.Value] = {}
for k, v in attrs.items():
pb_v = pb.Value()
pb_v = msg_pb.Value()
if isinstance(v, str):
pb_v.string_value = v
elif isinstance(v, bool):
Expand Down
66 changes: 0 additions & 66 deletions src/redpanda/runtime/proto/runtime_pb2.py

This file was deleted.

Loading