From 72ae198a65f0e8e8c44253e09da1fe42eb6ffe8a Mon Sep 17 00:00:00 2001 From: Randolf Jung Date: Sun, 22 Feb 2026 19:18:08 -0800 Subject: [PATCH] feat(granc-core): add dynamic_stream() for true streaming support Add DynamicStreamResponse type and dynamic_stream() method that returns a BoxStream instead of collecting all responses into a Vec. This allows callers to consume server-streaming and bidirectional-streaming messages incrementally as they arrive. The existing dynamic() method and DynamicResponse type remain unchanged for backward compatibility. Also makes GrpcClient::server_streaming() and bidirectional_streaming() return concrete tonic::Streaming instead of impl Stream, which enables the 'static lifetime bound needed by BoxStream. --- granc-core/src/client/online.rs | 30 ++++++- .../src/client/online_without_reflection.rs | 66 ++++++++++++++- granc-core/src/client/types.rs | 13 +++ granc-core/src/grpc/client.rs | 4 +- granc-core/tests/granc_client_online_test.rs | 50 +++++++++++- ...c_client_online_without_reflection_test.rs | 80 ++++++++++++++++++- 6 files changed, 237 insertions(+), 6 deletions(-) diff --git a/granc-core/src/client/online.rs b/granc-core/src/client/online.rs index 793fc86..4d5bbf8 100644 --- a/granc-core/src/client/online.rs +++ b/granc-core/src/client/online.rs @@ -3,7 +3,8 @@ //! This module defines the `GrancClient` behavior when it is connected to a server //! and using Server Reflection for schema resolution. use super::{ - Descriptor, DynamicRequest, DynamicResponse, GrancClient, Online, OnlineWithoutReflection, + Descriptor, DynamicRequest, DynamicResponse, DynamicStreamResponse, GrancClient, Online, + OnlineWithoutReflection, }; use crate::{ BoxError, @@ -221,4 +222,31 @@ where Ok(client.dynamic(request).await?) } + + /// Executes a dynamic gRPC request using Server Reflection, preserving streaming semantics. + /// + /// Unlike [`dynamic()`](Self::dynamic), which collects all streaming responses into a `Vec`, + /// this method returns a [`BoxStream`](futures_util::stream::BoxStream) for streaming RPCs, + /// allowing the caller to consume messages incrementally as they arrive. + /// + /// For unary and client-streaming calls, the behavior is identical to `dynamic()`. + pub async fn dynamic_stream( + &mut self, + request: DynamicRequest, + ) -> Result { + let fd_set = self + .state + .reflection_client + .file_descriptor_set_by_symbol(&request.service) + .await?; + + let pool = DescriptorPool::from_file_descriptor_set(fd_set)?; + + let mut client = GrancClient::new(OnlineWithoutReflection::new( + self.state.grpc_client.clone(), + pool, + )); + + Ok(client.dynamic_stream(request).await?) + } } diff --git a/granc-core/src/client/online_without_reflection.rs b/granc-core/src/client/online_without_reflection.rs index d251bd0..6c8705e 100644 --- a/granc-core/src/client/online_without_reflection.rs +++ b/granc-core/src/client/online_without_reflection.rs @@ -2,7 +2,7 @@ //! //! This module defines the `GrancClient` behavior when it is connected to a server //! but uses a local, in-memory `DescriptorPool` (Static schema) to resolve messages. -use super::{DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection}; +use super::{DynamicRequest, DynamicResponse, DynamicStreamResponse, GrancClient, OnlineWithoutReflection}; use crate::{BoxError, client::OfflineReflectionState, grpc::client::GrpcRequestError}; use futures_util::{Stream, StreamExt}; use http_body::Body as HttpBody; @@ -104,6 +104,70 @@ where } } } + + /// Executes a dynamic gRPC request, preserving streaming semantics. + /// + /// Unlike [`dynamic()`](Self::dynamic), which collects all streaming responses into a `Vec`, + /// this method returns a [`BoxStream`](futures_util::stream::BoxStream) for streaming RPCs, + /// allowing the caller to consume messages incrementally as they arrive. + /// + /// For unary and client-streaming calls, the behavior is identical to `dynamic()`. + pub async fn dynamic_stream( + &mut self, + request: DynamicRequest, + ) -> Result { + let method = self + .state + .descriptor_pool() + .get_service_by_name(&request.service) + .ok_or_else(|| DynamicCallError::ServiceNotFound(request.service.clone()))? + .methods() + .find(|m| m.name() == request.method) + .ok_or_else(|| DynamicCallError::MethodNotFound(request.method.clone()))?; + + match (method.is_client_streaming(), method.is_server_streaming()) { + (false, false) => { + let result = self + .state + .grpc_client + .unary(method, request.body, request.headers) + .await?; + Ok(DynamicStreamResponse::Single(result)) + } + (false, true) => match self + .state + .grpc_client + .server_streaming(method, request.body, request.headers) + .await? + { + Ok(stream) => Ok(DynamicStreamResponse::Streaming(stream.boxed())), + Err(status) => Ok(DynamicStreamResponse::Single(Err(status))), + }, + (true, false) => { + let input_stream = + json_array_to_stream(request.body).map_err(DynamicCallError::InvalidInput)?; + let result = self + .state + .grpc_client + .client_streaming(method, input_stream, request.headers) + .await?; + Ok(DynamicStreamResponse::Single(result)) + } + (true, true) => { + let input_stream = + json_array_to_stream(request.body).map_err(DynamicCallError::InvalidInput)?; + match self + .state + .grpc_client + .bidirectional_streaming(method, input_stream, request.headers) + .await? + { + Ok(stream) => Ok(DynamicStreamResponse::Streaming(stream.boxed())), + Err(status) => Ok(DynamicStreamResponse::Single(Err(status))), + } + } + } + } } fn json_array_to_stream( diff --git a/granc-core/src/client/types.rs b/granc-core/src/client/types.rs index eae387a..65c2e21 100644 --- a/granc-core/src/client/types.rs +++ b/granc-core/src/client/types.rs @@ -1,3 +1,4 @@ +use futures_util::stream::BoxStream; use prost_reflect::{EnumDescriptor, MessageDescriptor, ServiceDescriptor}; use std::fmt::Debug; @@ -25,6 +26,18 @@ pub enum DynamicResponse { Streaming(Result>, tonic::Status>), } +/// The result of a dynamic gRPC call that preserves streaming semantics. +/// +/// Unlike [`DynamicResponse`], which collects all streaming messages into a `Vec`, +/// this type returns a [`BoxStream`] for streaming RPCs, allowing the caller to +/// consume messages incrementally as they arrive. +pub enum DynamicStreamResponse { + /// A single response message (for Unary and Client Streaming calls). + Single(Result), + /// A stream of response messages (for Server Streaming and Bidirectional calls). + Streaming(BoxStream<'static, Result>), +} + /// A generic wrapper for different types of Protobuf descriptors. /// /// This enum allows the client to return a single type when resolving symbols, diff --git a/granc-core/src/grpc/client.rs b/granc-core/src/grpc/client.rs index 5ed9164..91dcebe 100644 --- a/granc-core/src/grpc/client.rs +++ b/granc-core/src/grpc/client.rs @@ -105,7 +105,7 @@ where payload: serde_json::Value, headers: Vec<(String, String)>, ) -> Result< - Result>, tonic::Status>, + Result, tonic::Status>, GrpcRequestError, > { self.client @@ -164,7 +164,7 @@ where payload_stream: impl Stream + Send + 'static, headers: Vec<(String, String)>, ) -> Result< - Result>, tonic::Status>, + Result, tonic::Status>, GrpcRequestError, > { self.client diff --git a/granc-core/tests/granc_client_online_test.rs b/granc-core/tests/granc_client_online_test.rs index 912d4ed..cc886d0 100644 --- a/granc-core/tests/granc_client_online_test.rs +++ b/granc-core/tests/granc_client_online_test.rs @@ -1,5 +1,8 @@ use echo_service_impl::EchoServiceImpl; -use granc_core::client::{DynamicRequest, DynamicResponse, GrancClient, Online, online}; +use futures_util::StreamExt; +use granc_core::client::{ + DynamicRequest, DynamicResponse, DynamicStreamResponse, GrancClient, Online, online, +}; use granc_core::reflection::client::ReflectionResolveError; use granc_test_support::echo_service::{EchoServiceServer, FILE_DESCRIPTOR_SET}; use tonic::Code; @@ -175,3 +178,48 @@ async fn test_reflection_schema_mismatch() { Ok(DynamicResponse::Unary(Err(status))) if status.code() == Code::Internal )); } + +#[tokio::test] +async fn test_reflection_dynamic_stream_unary() { + let mut client = setup_client().await; + + let req = DynamicRequest { + service: "echo.EchoService".to_string(), + method: "UnaryEcho".to_string(), + body: serde_json::json!({ "message": "reflection stream" }), + headers: vec![], + }; + + let res = client.dynamic_stream(req).await.unwrap(); + assert!( + matches!(res, DynamicStreamResponse::Single(Ok(val)) if val["message"] == "reflection stream") + ); +} + +#[tokio::test] +async fn test_reflection_dynamic_stream_server_streaming() { + let mut client = setup_client().await; + + let req = DynamicRequest { + service: "echo.EchoService".to_string(), + method: "ServerStreamingEcho".to_string(), + body: serde_json::json!({ "message": "stream" }), + headers: vec![], + }; + + let res = client.dynamic_stream(req).await.unwrap(); + + match res { + DynamicStreamResponse::Streaming(mut stream) => { + let mut messages = Vec::new(); + while let Some(item) = stream.next().await { + messages.push(item.unwrap()); + } + assert_eq!(messages.len(), 3); + assert_eq!(messages[0]["message"], "stream - seq 0"); + assert_eq!(messages[1]["message"], "stream - seq 1"); + assert_eq!(messages[2]["message"], "stream - seq 2"); + } + _ => panic!("Expected Streaming response"), + } +} diff --git a/granc-core/tests/granc_client_online_without_reflection_test.rs b/granc-core/tests/granc_client_online_without_reflection_test.rs index 07ba4fc..dc35880 100644 --- a/granc-core/tests/granc_client_online_without_reflection_test.rs +++ b/granc-core/tests/granc_client_online_without_reflection_test.rs @@ -1,6 +1,7 @@ use echo_service_impl::EchoServiceImpl; +use futures_util::StreamExt; use granc_core::client::{ - DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection, + DynamicRequest, DynamicResponse, DynamicStreamResponse, GrancClient, OnlineWithoutReflection, online_without_reflection, }; use granc_test_support::echo_service::{EchoServiceServer, FILE_DESCRIPTOR_SET}; @@ -192,3 +193,80 @@ async fn test_error_schema_mismatch() { && status.message().contains("JSON structure does not match Protobuf schema") )); } + +#[tokio::test] +async fn test_dynamic_stream_unary_returns_single() { + let mut client = setup_client(); + + let req = DynamicRequest { + service: "echo.EchoService".to_string(), + method: "UnaryEcho".to_string(), + body: serde_json::json!({ "message": "hello" }), + headers: vec![], + }; + + let res = client.dynamic_stream(req).await.unwrap(); + + assert!(matches!( + res, + DynamicStreamResponse::Single(Ok(val)) if val["message"] == "hello" + )); +} + +#[tokio::test] +async fn test_dynamic_stream_server_streaming_returns_stream() { + let mut client = setup_client(); + + let req = DynamicRequest { + service: "echo.EchoService".to_string(), + method: "ServerStreamingEcho".to_string(), + body: serde_json::json!({ "message": "stream" }), + headers: vec![], + }; + + let res = client.dynamic_stream(req).await.unwrap(); + + match res { + DynamicStreamResponse::Streaming(mut stream) => { + let mut messages = Vec::new(); + while let Some(item) = stream.next().await { + messages.push(item.unwrap()); + } + assert_eq!(messages.len(), 3); + assert_eq!(messages[0]["message"], "stream - seq 0"); + assert_eq!(messages[1]["message"], "stream - seq 1"); + assert_eq!(messages[2]["message"], "stream - seq 2"); + } + _ => panic!("Expected Streaming response"), + } +} + +#[tokio::test] +async fn test_dynamic_stream_bidirectional_returns_stream() { + let mut client = setup_client(); + + let req = DynamicRequest { + service: "echo.EchoService".to_string(), + method: "BidirectionalEcho".to_string(), + body: serde_json::json!([ + { "message": "Ping" }, + { "message": "Pong" } + ]), + headers: vec![], + }; + + let res = client.dynamic_stream(req).await.unwrap(); + + match res { + DynamicStreamResponse::Streaming(mut stream) => { + let mut messages = Vec::new(); + while let Some(item) = stream.next().await { + messages.push(item.unwrap()); + } + assert_eq!(messages.len(), 2); + assert_eq!(messages[0]["message"], "echo: Ping"); + assert_eq!(messages[1]["message"], "echo: Pong"); + } + _ => panic!("Expected Streaming response"), + } +}