To be more precise, onClose can be called on the underlying Fs2StreamClientCallListener while the client (and its dispatcher) are already closed. I don't have a full end-to-end example, but it comes down to this:
val someServiceResource: Resource[IO, SomeServiceFs2Grpc[IO, Unit]] = ???
someServiceResource
.map(
_.someStream(SomeStreamRequest(), ()).interruptAfter(1.second).compile.drain
)
.useEval
.unsafeRunSync()
Output:
Jun 07, 2023 1:52:23 PM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed@13affa0e
java.lang.IllegalStateException: dispatcher already shutdown
at cats.effect.std.Dispatcher$$anon$2.unsafeToFutureCancelable(Dispatcher.scala:422)
at cats.effect.std.DispatcherPlatform.unsafeRunTimed(DispatcherPlatform.scala:59)
at cats.effect.std.DispatcherPlatform.unsafeRunTimed$(DispatcherPlatform.scala:58)
at cats.effect.std.Dispatcher$$anon$2.unsafeRunTimed(Dispatcher.scala:317)
at cats.effect.std.DispatcherPlatform.unsafeRunSync(DispatcherPlatform.scala:51)
at cats.effect.std.DispatcherPlatform.unsafeRunSync$(DispatcherPlatform.scala:50)
at cats.effect.std.Dispatcher$$anon$2.unsafeRunSync(Dispatcher.scala:317)
at fs2.grpc.client.Fs2StreamClientCallListener.onClose(Fs2StreamClientCallListener.scala:43)
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:468)
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:432)
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:465)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
So interruptAfter preemptively stops the stream after which the surrounding resource is immediately closed. But onClose still gets called afterwards and invokes the already closed dispatcher.
To be more precise,
onClosecan be called on the underlyingFs2StreamClientCallListenerwhile the client (and its dispatcher) are already closed. I don't have a full end-to-end example, but it comes down to this:Output:
So
interruptAfterpreemptively stops the stream after which the surrounding resource is immediately closed. ButonClosestill gets called afterwards and invokes the already closed dispatcher.