Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions granc-core/src/client/online_without_reflection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! but uses a local, in-memory `DescriptorPool` (Static schema) to resolve messages.
use super::{DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection};
use crate::{BoxError, client::OfflineReflectionState, grpc::client::GrpcRequestError};
use futures_util::{Stream, StreamExt};
use futures_util::{Stream, StreamExt, stream};
use http_body::Body as HttpBody;
use std::fmt::Debug;

Expand Down Expand Up @@ -76,8 +76,10 @@ where
.server_streaming(method, request.body, request.headers)
.await?
{
Ok(stream) => Ok(DynamicResponse::Streaming(Ok(stream.collect().await))),
Err(status) => Ok(DynamicResponse::Streaming(Err(status))),
Ok(stream) => Ok(DynamicResponse::Streaming(stream.boxed())),
Err(status) => Ok(DynamicResponse::Streaming(
stream::once(async { Err(status) }).boxed(),
)),
},
(true, false) => {
let input_stream =
Expand All @@ -98,8 +100,10 @@ where
.bidirectional_streaming(method, input_stream, request.headers)
.await?
{
Ok(stream) => Ok(DynamicResponse::Streaming(Ok(stream.collect().await))),
Err(status) => Ok(DynamicResponse::Streaming(Err(status))),
Ok(stream) => Ok(DynamicResponse::Streaming(stream.boxed())),
Err(status) => Ok(DynamicResponse::Streaming(
stream::once(async { Err(status) }).boxed(),
)),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions granc-core/src/client/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures_util::stream::BoxStream;
use prost_reflect::{EnumDescriptor, MessageDescriptor, ServiceDescriptor};
use std::fmt::Debug;

Expand All @@ -17,12 +18,11 @@ pub struct DynamicRequest {
}

/// The result of a dynamic gRPC call.
#[derive(Debug, Clone)]
pub enum DynamicResponse {
/// A single response message (for Unary and Client Streaming calls).
Unary(Result<serde_json::Value, tonic::Status>),
/// A stream of response messages (for Server Streaming and Bidirectional calls).
Streaming(Result<Vec<Result<serde_json::Value, tonic::Status>>, tonic::Status>),
Streaming(BoxStream<'static, Result<serde_json::Value, tonic::Status>>),
}

/// A generic wrapper for different types of Protobuf descriptors.
Expand Down
10 changes: 2 additions & 8 deletions granc-core/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ where
method: MethodDescriptor,
payload: serde_json::Value,
headers: Vec<(String, String)>,
) -> Result<
Result<impl Stream<Item = Result<serde_json::Value, tonic::Status>>, tonic::Status>,
GrpcRequestError,
> {
) -> Result<Result<tonic::Streaming<serde_json::Value>, tonic::Status>, GrpcRequestError> {
self.client
.ready()
.await
Expand Down Expand Up @@ -163,10 +160,7 @@ where
method: MethodDescriptor,
payload_stream: impl Stream<Item = serde_json::Value> + Send + 'static,
headers: Vec<(String, String)>,
) -> Result<
Result<impl Stream<Item = Result<serde_json::Value, tonic::Status>>, tonic::Status>,
GrpcRequestError,
> {
) -> Result<Result<tonic::Streaming<serde_json::Value>, tonic::Status>, GrpcRequestError> {
self.client
.ready()
.await
Expand Down
5 changes: 4 additions & 1 deletion granc-core/tests/granc_client_online_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use echo_service_impl::EchoServiceImpl;
use granc_core::client::{DynamicRequest, DynamicResponse, GrancClient, Online, online};
use granc_core::reflection::client::ReflectionResolveError;
use granc_test_support::echo_service::{EchoServiceServer, FILE_DESCRIPTOR_SET};
use tokio_stream::StreamExt;
use tonic::Code;
use tonic::service::Routes;

Expand Down Expand Up @@ -62,7 +63,9 @@ async fn test_reflection_server_streaming_success() {
let res = client.dynamic(req).await.unwrap();

match res {
DynamicResponse::Streaming(Ok(stream)) => {
DynamicResponse::Streaming(stream) => {
let stream: Vec<_> = stream.collect().await;

assert_eq!(stream.len(), 3);
assert_eq!(stream[0].as_ref().unwrap()["message"], "stream - seq 0");
assert_eq!(stream[1].as_ref().unwrap()["message"], "stream - seq 1");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use echo_service_impl::EchoServiceImpl;
use futures_util::StreamExt;
use granc_core::client::{
DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection,
online_without_reflection,
Expand Down Expand Up @@ -50,7 +51,9 @@ async fn test_dynamic_server_streaming_success() {
let res = client.dynamic(req).await.unwrap();

match res {
DynamicResponse::Streaming(Ok(stream)) => {
DynamicResponse::Streaming(stream) => {
let stream: Vec<_> = stream.collect().await;

assert_eq!(stream.len(), 3);
assert_eq!(stream[0].as_ref().unwrap()["message"], "stream - seq 0");
assert_eq!(stream[1].as_ref().unwrap()["message"], "stream - seq 1");
Expand Down Expand Up @@ -101,7 +104,9 @@ async fn test_dynamic_bidirectional_streaming_success() {
let res = client.dynamic(req).await.unwrap();

match res {
DynamicResponse::Streaming(Ok(stream)) => {
DynamicResponse::Streaming(stream) => {
let stream: Vec<_> = stream.collect().await;

assert_eq!(stream.len(), 2);
assert_eq!(stream[0].as_ref().unwrap()["message"], "echo: Ping");
assert_eq!(stream[1].as_ref().unwrap()["message"], "echo: Pong");
Expand Down