Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion granc-core/src/client/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
//! This module defines the `GrancClient` behavior when it is connected to a server
//! and using Server Reflection for schema resolution.
use super::{
Descriptor, DynamicRequest, DynamicResponse, GrancClient, Online, OnlineWithoutReflection,
Descriptor, DynamicRequest, DynamicResponse, DynamicStreamResponse, GrancClient, Online,
OnlineWithoutReflection,
};
use crate::{
BoxError,
Expand Down Expand Up @@ -221,4 +222,31 @@ where

Ok(client.dynamic(request).await?)
}

/// Executes a dynamic gRPC request using Server Reflection, preserving streaming semantics.
///
/// Unlike [`dynamic()`](Self::dynamic), which collects all streaming responses into a `Vec`,
/// this method returns a [`BoxStream`](futures_util::stream::BoxStream) for streaming RPCs,
/// allowing the caller to consume messages incrementally as they arrive.
///
/// For unary and client-streaming calls, the behavior is identical to `dynamic()`.
pub async fn dynamic_stream(
&mut self,
request: DynamicRequest,
) -> Result<DynamicStreamResponse, DynamicCallError> {
let fd_set = self
.state
.reflection_client
.file_descriptor_set_by_symbol(&request.service)
.await?;

let pool = DescriptorPool::from_file_descriptor_set(fd_set)?;

let mut client = GrancClient::new(OnlineWithoutReflection::new(
self.state.grpc_client.clone(),
pool,
));

Ok(client.dynamic_stream(request).await?)
}
}
66 changes: 65 additions & 1 deletion granc-core/src/client/online_without_reflection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! This module defines the `GrancClient` behavior when it is connected to a server
//! but uses a local, in-memory `DescriptorPool` (Static schema) to resolve messages.
use super::{DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection};
use super::{DynamicRequest, DynamicResponse, DynamicStreamResponse, GrancClient, OnlineWithoutReflection};
use crate::{BoxError, client::OfflineReflectionState, grpc::client::GrpcRequestError};
use futures_util::{Stream, StreamExt};
use http_body::Body as HttpBody;
Expand Down Expand Up @@ -104,6 +104,70 @@ where
}
}
}

/// Executes a dynamic gRPC request, preserving streaming semantics.
///
/// Unlike [`dynamic()`](Self::dynamic), which collects all streaming responses into a `Vec`,
/// this method returns a [`BoxStream`](futures_util::stream::BoxStream) for streaming RPCs,
/// allowing the caller to consume messages incrementally as they arrive.
///
/// For unary and client-streaming calls, the behavior is identical to `dynamic()`.
pub async fn dynamic_stream(
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly looking at the implementation, we can simply unify this into the 'dynamic' method

&mut self,
request: DynamicRequest,
) -> Result<DynamicStreamResponse, DynamicCallError> {
let method = self
.state
.descriptor_pool()
.get_service_by_name(&request.service)
.ok_or_else(|| DynamicCallError::ServiceNotFound(request.service.clone()))?
.methods()
.find(|m| m.name() == request.method)
.ok_or_else(|| DynamicCallError::MethodNotFound(request.method.clone()))?;

match (method.is_client_streaming(), method.is_server_streaming()) {
(false, false) => {
let result = self
.state
.grpc_client
.unary(method, request.body, request.headers)
.await?;
Ok(DynamicStreamResponse::Single(result))
}
(false, true) => match self
.state
.grpc_client
.server_streaming(method, request.body, request.headers)
.await?
{
Ok(stream) => Ok(DynamicStreamResponse::Streaming(stream.boxed())),
Err(status) => Ok(DynamicStreamResponse::Single(Err(status))),
},
(true, false) => {
let input_stream =
json_array_to_stream(request.body).map_err(DynamicCallError::InvalidInput)?;
let result = self
.state
.grpc_client
.client_streaming(method, input_stream, request.headers)
.await?;
Ok(DynamicStreamResponse::Single(result))
}
(true, true) => {
let input_stream =
json_array_to_stream(request.body).map_err(DynamicCallError::InvalidInput)?;
match self
.state
.grpc_client
.bidirectional_streaming(method, input_stream, request.headers)
.await?
{
Ok(stream) => Ok(DynamicStreamResponse::Streaming(stream.boxed())),
Err(status) => Ok(DynamicStreamResponse::Single(Err(status))),
}
}
}
}
}

fn json_array_to_stream(
Expand Down
13 changes: 13 additions & 0 deletions granc-core/src/client/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures_util::stream::BoxStream;
use prost_reflect::{EnumDescriptor, MessageDescriptor, ServiceDescriptor};
use std::fmt::Debug;

Expand Down Expand Up @@ -25,6 +26,18 @@ pub enum DynamicResponse {
Streaming(Result<Vec<Result<serde_json::Value, tonic::Status>>, tonic::Status>),
}

/// The result of a dynamic gRPC call that preserves streaming semantics.
///
/// Unlike [`DynamicResponse`], which collects all streaming messages into a `Vec`,
/// this type returns a [`BoxStream`] for streaming RPCs, allowing the caller to
/// consume messages incrementally as they arrive.
pub enum DynamicStreamResponse {
/// A single response message (for Unary and Client Streaming calls).
Single(Result<serde_json::Value, tonic::Status>),
/// A stream of response messages (for Server Streaming and Bidirectional calls).
Streaming(BoxStream<'static, Result<serde_json::Value, tonic::Status>>),
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is no need for a new type here, we can simply update the implementation of 'dynamic' and update the original DynamicResponse::Stream to use a BoxStream

This way we have a consistent API with a single dynamic function

Don't worry about breaking changes, we can always create a new breaking release (minor in this case since we are under version 0.X.X)

}

/// A generic wrapper for different types of Protobuf descriptors.
///
/// This enum allows the client to return a single type when resolving symbols,
Expand Down
4 changes: 2 additions & 2 deletions granc-core/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
payload: serde_json::Value,
headers: Vec<(String, String)>,
) -> Result<
Result<impl Stream<Item = Result<serde_json::Value, tonic::Status>>, tonic::Status>,
Result<tonic::Streaming<serde_json::Value>, tonic::Status>,
GrpcRequestError,
> {
self.client
Expand Down Expand Up @@ -164,7 +164,7 @@ where
payload_stream: impl Stream<Item = serde_json::Value> + Send + 'static,
headers: Vec<(String, String)>,
) -> Result<
Result<impl Stream<Item = Result<serde_json::Value, tonic::Status>>, tonic::Status>,
Result<tonic::Streaming<serde_json::Value>, tonic::Status>,
GrpcRequestError,
> {
self.client
Expand Down
50 changes: 49 additions & 1 deletion granc-core/tests/granc_client_online_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use echo_service_impl::EchoServiceImpl;
use granc_core::client::{DynamicRequest, DynamicResponse, GrancClient, Online, online};
use futures_util::StreamExt;
use granc_core::client::{
DynamicRequest, DynamicResponse, DynamicStreamResponse, GrancClient, Online, online,
};
use granc_core::reflection::client::ReflectionResolveError;
use granc_test_support::echo_service::{EchoServiceServer, FILE_DESCRIPTOR_SET};
use tonic::Code;
Expand Down Expand Up @@ -175,3 +178,48 @@ async fn test_reflection_schema_mismatch() {
Ok(DynamicResponse::Unary(Err(status))) if status.code() == Code::Internal
));
}

#[tokio::test]
async fn test_reflection_dynamic_stream_unary() {
let mut client = setup_client().await;

let req = DynamicRequest {
service: "echo.EchoService".to_string(),
method: "UnaryEcho".to_string(),
body: serde_json::json!({ "message": "reflection stream" }),
headers: vec![],
};

let res = client.dynamic_stream(req).await.unwrap();
assert!(
matches!(res, DynamicStreamResponse::Single(Ok(val)) if val["message"] == "reflection stream")
);
}

#[tokio::test]
async fn test_reflection_dynamic_stream_server_streaming() {
let mut client = setup_client().await;

let req = DynamicRequest {
service: "echo.EchoService".to_string(),
method: "ServerStreamingEcho".to_string(),
body: serde_json::json!({ "message": "stream" }),
headers: vec![],
};

let res = client.dynamic_stream(req).await.unwrap();

match res {
DynamicStreamResponse::Streaming(mut stream) => {
let mut messages = Vec::new();
while let Some(item) = stream.next().await {
messages.push(item.unwrap());
}
assert_eq!(messages.len(), 3);
assert_eq!(messages[0]["message"], "stream - seq 0");
assert_eq!(messages[1]["message"], "stream - seq 1");
assert_eq!(messages[2]["message"], "stream - seq 2");
}
_ => panic!("Expected Streaming response"),
}
}
80 changes: 79 additions & 1 deletion granc-core/tests/granc_client_online_without_reflection_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use echo_service_impl::EchoServiceImpl;
use futures_util::StreamExt;
use granc_core::client::{
DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection,
DynamicRequest, DynamicResponse, DynamicStreamResponse, GrancClient, OnlineWithoutReflection,
online_without_reflection,
};
use granc_test_support::echo_service::{EchoServiceServer, FILE_DESCRIPTOR_SET};
Expand Down Expand Up @@ -192,3 +193,80 @@ async fn test_error_schema_mismatch() {
&& status.message().contains("JSON structure does not match Protobuf schema")
));
}

#[tokio::test]
async fn test_dynamic_stream_unary_returns_single() {
let mut client = setup_client();

let req = DynamicRequest {
service: "echo.EchoService".to_string(),
method: "UnaryEcho".to_string(),
body: serde_json::json!({ "message": "hello" }),
headers: vec![],
};

let res = client.dynamic_stream(req).await.unwrap();

assert!(matches!(
res,
DynamicStreamResponse::Single(Ok(val)) if val["message"] == "hello"
));
}

#[tokio::test]
async fn test_dynamic_stream_server_streaming_returns_stream() {
let mut client = setup_client();

let req = DynamicRequest {
service: "echo.EchoService".to_string(),
method: "ServerStreamingEcho".to_string(),
body: serde_json::json!({ "message": "stream" }),
headers: vec![],
};

let res = client.dynamic_stream(req).await.unwrap();

match res {
DynamicStreamResponse::Streaming(mut stream) => {
let mut messages = Vec::new();
while let Some(item) = stream.next().await {
messages.push(item.unwrap());
}
assert_eq!(messages.len(), 3);
assert_eq!(messages[0]["message"], "stream - seq 0");
assert_eq!(messages[1]["message"], "stream - seq 1");
assert_eq!(messages[2]["message"], "stream - seq 2");
}
_ => panic!("Expected Streaming response"),
}
}

#[tokio::test]
async fn test_dynamic_stream_bidirectional_returns_stream() {
let mut client = setup_client();

let req = DynamicRequest {
service: "echo.EchoService".to_string(),
method: "BidirectionalEcho".to_string(),
body: serde_json::json!([
{ "message": "Ping" },
{ "message": "Pong" }
]),
headers: vec![],
};

let res = client.dynamic_stream(req).await.unwrap();

match res {
DynamicStreamResponse::Streaming(mut stream) => {
let mut messages = Vec::new();
while let Some(item) = stream.next().await {
messages.push(item.unwrap());
}
assert_eq!(messages.len(), 2);
assert_eq!(messages[0]["message"], "echo: Ping");
assert_eq!(messages[1]["message"], "echo: Pong");
}
_ => panic!("Expected Streaming response"),
}
}