From d63ff35f5430b55061da6932af9e0f204beecbab Mon Sep 17 00:00:00 2001 From: Arnaud Burlet Date: Thu, 2 Oct 2025 20:41:49 +0200 Subject: [PATCH] Use UnsafeBoundedQueue and avoid dispatcher --- .../scala/fs2/grpc/client/Fs2ClientCall.scala | 2 +- .../client/Fs2StreamClientCallListener.scala | 15 +++++------ .../server/Fs2StreamServerCallListener.scala | 4 +-- .../scala/fs2/grpc/shared/StreamIngest.scala | 26 +++++++++---------- .../fs2/grpc/shared/StreamIngestSuite.scala | 2 +- 5 files changed, 23 insertions(+), 26 deletions(-) diff --git a/runtime/src/main/scala/fs2/grpc/client/Fs2ClientCall.scala b/runtime/src/main/scala/fs2/grpc/client/Fs2ClientCall.scala index 2670dae0..98968587 100644 --- a/runtime/src/main/scala/fs2/grpc/client/Fs2ClientCall.scala +++ b/runtime/src/main/scala/fs2/grpc/client/Fs2ClientCall.scala @@ -108,7 +108,7 @@ class Fs2ClientCall[F[_], Request, Response] private[client] ( signalReadiness: SyncIO[Unit] ): Resource[F, Fs2StreamClientCallListener[F, Response]] = { val prefetchN = options.prefetchN.max(1) - val create = Fs2StreamClientCallListener.create[F, Response](request, signalReadiness, dispatcher, prefetchN) + val create = Fs2StreamClientCallListener.create[F, Response](request, signalReadiness, prefetchN) val acquire = start(create, md) val release = handleExitCase(cancelSucceed = true) diff --git a/runtime/src/main/scala/fs2/grpc/client/Fs2StreamClientCallListener.scala b/runtime/src/main/scala/fs2/grpc/client/Fs2StreamClientCallListener.scala index 3844bec0..bd2fd531 100644 --- a/runtime/src/main/scala/fs2/grpc/client/Fs2StreamClientCallListener.scala +++ b/runtime/src/main/scala/fs2/grpc/client/Fs2StreamClientCallListener.scala @@ -25,23 +25,21 @@ package client import cats.effect.SyncIO import cats.implicits._ -import cats.effect.kernel.Concurrent -import cats.effect.std.Dispatcher +import cats.effect.kernel.Async import fs2.grpc.shared.StreamIngest import io.grpc.{ClientCall, Metadata, Status} private[client] class Fs2StreamClientCallListener[F[_], Response] private ( ingest: StreamIngest[F, Response], - signalReadiness: SyncIO[Unit], - dispatcher: Dispatcher[F] + signalReadiness: SyncIO[Unit] ) extends ClientCall.Listener[Response] { override def onMessage(message: Response): Unit = - dispatcher.unsafeRunSync(ingest.onMessage(message)) + ingest.unsafeOnMessage(message) override def onClose(status: Status, trailers: Metadata): Unit = { val error = if (status.isOk) None else Some(status.asRuntimeException(trailers)) - dispatcher.unsafeRunSync(ingest.onClose(error)) + ingest.unsafeOnClose(error) } override def onReady(): Unit = signalReadiness.unsafeRunSync() @@ -51,14 +49,13 @@ private[client] class Fs2StreamClientCallListener[F[_], Response] private ( private[client] object Fs2StreamClientCallListener { - def create[F[_]: Concurrent, Response]( + def create[F[_]: Async, Response]( request: Int => F[Unit], signalReadiness: SyncIO[Unit], - dispatcher: Dispatcher[F], prefetchN: Int ): F[Fs2StreamClientCallListener[F, Response]] = StreamIngest[F, Response](request, prefetchN).map( - new Fs2StreamClientCallListener[F, Response](_, signalReadiness, dispatcher) + new Fs2StreamClientCallListener[F, Response](_, signalReadiness) ) } diff --git a/runtime/src/main/scala/fs2/grpc/server/Fs2StreamServerCallListener.scala b/runtime/src/main/scala/fs2/grpc/server/Fs2StreamServerCallListener.scala index 15fc5ec9..c470d8e3 100644 --- a/runtime/src/main/scala/fs2/grpc/server/Fs2StreamServerCallListener.scala +++ b/runtime/src/main/scala/fs2/grpc/server/Fs2StreamServerCallListener.scala @@ -45,12 +45,12 @@ private[server] class Fs2StreamServerCallListener[F[_], Request, Response] priva dispatcher.unsafeRunSync(isCancelled.complete(()).void) override def onMessage(message: Request): Unit = - dispatcher.unsafeRunSync(ingest.onMessage(message)) + ingest.unsafeOnMessage(message) override def onReady(): Unit = signalReadiness.unsafeRunSync() override def onHalfClose(): Unit = - dispatcher.unsafeRunSync(ingest.onClose(None)) + ingest.unsafeOnClose(None) override def source: Stream[F, Request] = ingest.messages } diff --git a/runtime/src/main/scala/fs2/grpc/shared/StreamIngest.scala b/runtime/src/main/scala/fs2/grpc/shared/StreamIngest.scala index 8768474d..9ceca5b5 100644 --- a/runtime/src/main/scala/fs2/grpc/shared/StreamIngest.scala +++ b/runtime/src/main/scala/fs2/grpc/shared/StreamIngest.scala @@ -23,38 +23,38 @@ package fs2 package grpc package shared -import cats.implicits._ -import cats.effect.Concurrent -import cats.effect.std.Queue +import cats.implicits.* +import cats.effect.Async +import cats.effect.std.{Queue, unsafe} private[grpc] trait StreamIngest[F[_], T] { - def onMessage(msg: T): F[Unit] - def onClose(error: Option[Throwable]): F[Unit] + def unsafeOnMessage(msg: T): Unit + def unsafeOnClose(error: Option[Throwable]): Unit def messages: Stream[F, T] } private[grpc] object StreamIngest { - def apply[F[_]: Concurrent, T]( + def apply[F[_]: Async, T]( request: Int => F[Unit], prefetchN: Int ): F[StreamIngest[F, T]] = Queue - .unbounded[F, Either[Option[Throwable], T]] + .unsafeUnbounded[F, Either[Option[Throwable], T]] .map(q => create[F, T](request, prefetchN, q)) def create[F[_], T]( request: Int => F[Unit], prefetchN: Int, - queue: Queue[F, Either[Option[Throwable], T]] - )(implicit F: Concurrent[F]): StreamIngest[F, T] = new StreamIngest[F, T] { + queue: unsafe.UnboundedQueue[F, Either[Option[Throwable], T]] + )(implicit F: Async[F]): StreamIngest[F, T] = new StreamIngest[F, T] { private val limit: Int = math.max(1, prefetchN) - def onMessage(msg: T): F[Unit] = - queue.offer(msg.asRight) + def unsafeOnMessage(msg: T): Unit = + queue.unsafeOffer(msg.asRight) - def onClose(error: Option[Throwable]): F[Unit] = - queue.offer(error.asLeft) + def unsafeOnClose(error: Option[Throwable]): Unit = + queue.unsafeOffer(error.asLeft) val messages: Stream[F, T] = { type Requested = Int diff --git a/runtime/src/test/scala/fs2/grpc/shared/StreamIngestSuite.scala b/runtime/src/test/scala/fs2/grpc/shared/StreamIngestSuite.scala index dd1e0d07..7fb5e1d9 100644 --- a/runtime/src/test/scala/fs2/grpc/shared/StreamIngestSuite.scala +++ b/runtime/src/test/scala/fs2/grpc/shared/StreamIngestSuite.scala @@ -34,7 +34,7 @@ class StreamIngestSuite extends CatsEffectSuite with CatsEffectFunFixtures { for { ref <- IO.ref(0) ingest <- StreamIngest[IO, Int](req => ref.update(_ + req), prefetchN) - _ <- Stream.emits((1 to prefetchN)).evalTap(ingest.onMessage).compile.drain + _ <- Stream.emits((1 to prefetchN)).evalTap(m => IO(ingest.unsafeOnMessage(m))).compile.drain messages <- ingest.messages.take(takeN.toLong).compile.toList requested <- ref.get } yield {