From 8d63209b5ff2f0914fb18a29ab12fca97ed7216e Mon Sep 17 00:00:00 2001 From: JasterV <49537445+JasterV@users.noreply.github.com> Date: Sun, 1 Mar 2026 19:52:48 +0100 Subject: [PATCH] refactor: return a true stream instead of a vector in the dynamic response --- granc-core/src/client/online_without_reflection.rs | 14 +++++++++----- granc-core/src/client/types.rs | 4 ++-- granc-core/src/grpc/client.rs | 10 ++-------- granc-core/tests/granc_client_online_test.rs | 5 ++++- .../granc_client_online_without_reflection_test.rs | 9 +++++++-- 5 files changed, 24 insertions(+), 18 deletions(-) diff --git a/granc-core/src/client/online_without_reflection.rs b/granc-core/src/client/online_without_reflection.rs index d251bd0..24a15c6 100644 --- a/granc-core/src/client/online_without_reflection.rs +++ b/granc-core/src/client/online_without_reflection.rs @@ -4,7 +4,7 @@ //! but uses a local, in-memory `DescriptorPool` (Static schema) to resolve messages. use super::{DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection}; use crate::{BoxError, client::OfflineReflectionState, grpc::client::GrpcRequestError}; -use futures_util::{Stream, StreamExt}; +use futures_util::{Stream, StreamExt, stream}; use http_body::Body as HttpBody; use std::fmt::Debug; @@ -76,8 +76,10 @@ where .server_streaming(method, request.body, request.headers) .await? { - Ok(stream) => Ok(DynamicResponse::Streaming(Ok(stream.collect().await))), - Err(status) => Ok(DynamicResponse::Streaming(Err(status))), + Ok(stream) => Ok(DynamicResponse::Streaming(stream.boxed())), + Err(status) => Ok(DynamicResponse::Streaming( + stream::once(async { Err(status) }).boxed(), + )), }, (true, false) => { let input_stream = @@ -98,8 +100,10 @@ where .bidirectional_streaming(method, input_stream, request.headers) .await? { - Ok(stream) => Ok(DynamicResponse::Streaming(Ok(stream.collect().await))), - Err(status) => Ok(DynamicResponse::Streaming(Err(status))), + Ok(stream) => Ok(DynamicResponse::Streaming(stream.boxed())), + Err(status) => Ok(DynamicResponse::Streaming( + stream::once(async { Err(status) }).boxed(), + )), } } } diff --git a/granc-core/src/client/types.rs b/granc-core/src/client/types.rs index eae387a..c19c173 100644 --- a/granc-core/src/client/types.rs +++ b/granc-core/src/client/types.rs @@ -1,3 +1,4 @@ +use futures_util::stream::BoxStream; use prost_reflect::{EnumDescriptor, MessageDescriptor, ServiceDescriptor}; use std::fmt::Debug; @@ -17,12 +18,11 @@ pub struct DynamicRequest { } /// The result of a dynamic gRPC call. -#[derive(Debug, Clone)] pub enum DynamicResponse { /// A single response message (for Unary and Client Streaming calls). Unary(Result), /// A stream of response messages (for Server Streaming and Bidirectional calls). - Streaming(Result>, tonic::Status>), + Streaming(BoxStream<'static, Result>), } /// A generic wrapper for different types of Protobuf descriptors. diff --git a/granc-core/src/grpc/client.rs b/granc-core/src/grpc/client.rs index 5ed9164..3350233 100644 --- a/granc-core/src/grpc/client.rs +++ b/granc-core/src/grpc/client.rs @@ -104,10 +104,7 @@ where method: MethodDescriptor, payload: serde_json::Value, headers: Vec<(String, String)>, - ) -> Result< - Result>, tonic::Status>, - GrpcRequestError, - > { + ) -> Result, tonic::Status>, GrpcRequestError> { self.client .ready() .await @@ -163,10 +160,7 @@ where method: MethodDescriptor, payload_stream: impl Stream + Send + 'static, headers: Vec<(String, String)>, - ) -> Result< - Result>, tonic::Status>, - GrpcRequestError, - > { + ) -> Result, tonic::Status>, GrpcRequestError> { self.client .ready() .await diff --git a/granc-core/tests/granc_client_online_test.rs b/granc-core/tests/granc_client_online_test.rs index 912d4ed..ccca11c 100644 --- a/granc-core/tests/granc_client_online_test.rs +++ b/granc-core/tests/granc_client_online_test.rs @@ -2,6 +2,7 @@ use echo_service_impl::EchoServiceImpl; use granc_core::client::{DynamicRequest, DynamicResponse, GrancClient, Online, online}; use granc_core::reflection::client::ReflectionResolveError; use granc_test_support::echo_service::{EchoServiceServer, FILE_DESCRIPTOR_SET}; +use tokio_stream::StreamExt; use tonic::Code; use tonic::service::Routes; @@ -62,7 +63,9 @@ async fn test_reflection_server_streaming_success() { let res = client.dynamic(req).await.unwrap(); match res { - DynamicResponse::Streaming(Ok(stream)) => { + DynamicResponse::Streaming(stream) => { + let stream: Vec<_> = stream.collect().await; + assert_eq!(stream.len(), 3); assert_eq!(stream[0].as_ref().unwrap()["message"], "stream - seq 0"); assert_eq!(stream[1].as_ref().unwrap()["message"], "stream - seq 1"); diff --git a/granc-core/tests/granc_client_online_without_reflection_test.rs b/granc-core/tests/granc_client_online_without_reflection_test.rs index 07ba4fc..5337091 100644 --- a/granc-core/tests/granc_client_online_without_reflection_test.rs +++ b/granc-core/tests/granc_client_online_without_reflection_test.rs @@ -1,4 +1,5 @@ use echo_service_impl::EchoServiceImpl; +use futures_util::StreamExt; use granc_core::client::{ DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection, online_without_reflection, @@ -50,7 +51,9 @@ async fn test_dynamic_server_streaming_success() { let res = client.dynamic(req).await.unwrap(); match res { - DynamicResponse::Streaming(Ok(stream)) => { + DynamicResponse::Streaming(stream) => { + let stream: Vec<_> = stream.collect().await; + assert_eq!(stream.len(), 3); assert_eq!(stream[0].as_ref().unwrap()["message"], "stream - seq 0"); assert_eq!(stream[1].as_ref().unwrap()["message"], "stream - seq 1"); @@ -101,7 +104,9 @@ async fn test_dynamic_bidirectional_streaming_success() { let res = client.dynamic(req).await.unwrap(); match res { - DynamicResponse::Streaming(Ok(stream)) => { + DynamicResponse::Streaming(stream) => { + let stream: Vec<_> = stream.collect().await; + assert_eq!(stream.len(), 2); assert_eq!(stream[0].as_ref().unwrap()["message"], "echo: Ping"); assert_eq!(stream[1].as_ref().unwrap()["message"], "echo: Pong");