From f2f8d2338942db6e5abf926427309eacabf537bf Mon Sep 17 00:00:00 2001 From: Brian Holt Date: Thu, 8 Jan 2026 17:41:05 -0600 Subject: [PATCH 1/2] implement the Twitter Future cancellation protocol --- project/AsyncUtilsBuildPlugin.scala | 3 + .../scala/com/dwolla/util/async/twitter.scala | 62 +++++++--- .../twitter/TwitterFutureAsyncMapKTests.scala | 113 ++++++++++++++++++ 3 files changed, 163 insertions(+), 15 deletions(-) create mode 100644 twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala diff --git a/project/AsyncUtilsBuildPlugin.scala b/project/AsyncUtilsBuildPlugin.scala index 03cfcccae..48d7e897c 100644 --- a/project/AsyncUtilsBuildPlugin.scala +++ b/project/AsyncUtilsBuildPlugin.scala @@ -158,6 +158,9 @@ object AsyncUtilsBuildPlugin extends AutoPlugin { Seq( "org.typelevel" %% "cats-effect" % CatsEffect3V, "com.twitter" %% "util-core" % v, + "org.scalameta" %% "munit" % "1.2.1" % Test, + "org.typelevel" %% "munit-cats-effect" % "2.1.0" % Test, + "org.typelevel" %% "scalacheck-effect-munit" % "2.1.0-RC1" % Test, ) ++ (if (scalaVersion.value.startsWith("2")) scala2CompilerPlugins else Nil) }, mimaPreviousArtifacts += organizationName.value %% name.value % "0.3.0", diff --git a/twitter-futures/src/main/scala/com/dwolla/util/async/twitter.scala b/twitter-futures/src/main/scala/com/dwolla/util/async/twitter.scala index 01d307092..968cf990c 100644 --- a/twitter-futures/src/main/scala/com/dwolla/util/async/twitter.scala +++ b/twitter-futures/src/main/scala/com/dwolla/util/async/twitter.scala @@ -1,13 +1,17 @@ package com.dwolla.util.async -import cats._ -import cats.data._ -import cats.effect._ -import cats.syntax.all._ -import cats.tagless._ -import cats.tagless.syntax.all._ +import cats.* +import cats.data.* +import cats.effect.* +import cats.syntax.all.* +import cats.tagless.* +import cats.tagless.syntax.all.* +import com.dwolla.util.async.twitter.CancelledViaCatsEffect import com.twitter.util +import java.util.concurrent.CancellationException +import scala.util.control.NoStackTrace + object twitter extends ToAsyncFunctorKOps { implicit def twitterFutureAsyncFunctorK[F[_]]: util.Future ~~> F = new (util.Future ~~> F) { override def asyncMapK[Alg[_[_]] : FunctorK](alg: Alg[util.Future]) @@ -19,6 +23,10 @@ object twitter extends ToAsyncFunctorKOps { def provide[F[_]] = new PartiallyAppliedProvide[F] def liftFuture[F[_]] = new PartiallyAppliedLiftFuture[F] + + private[async] case object CancelledViaCatsEffect + extends CancellationException("Cancelled via cats-effect") + with NoStackTrace } class PartiallyAppliedProvide[F[_]](private val dummy: Boolean = true) extends AnyVal { @@ -34,15 +42,39 @@ class PartiallyAppliedProvide[F[_]](private val dummy: Boolean = true) extends A } class PartiallyAppliedLiftFuture[F[_]] { - def apply[A](fa: F[util.Future[A]]) - (implicit - F: Async[F]): F[A] = - Async[F].async[A] { cb => - fa.map { - _.respond { - case util.Return(a) => cb(Right(a)) - case util.Throw(ex) => cb(Left(ex)) + def apply[A](ffa: F[util.Future[A]]) + (implicit F: Async[F]): F[A] = + MonadCancelThrow[F].uncancelable { (poll: Poll[F]) => + poll { + Async[F].async[A] { cb: (Either[Throwable, A] => Unit) => + ffa + .flatMap { fa => + Sync[F].delay { + fa.respond { + case util.Return(a) => cb(Right(a)) + case util.Throw(ex) => cb(Left(ex)) + } + } + } + .map { fa => + Sync[F].delay { + fa.raise(CancelledViaCatsEffect) + }.some + } } - }.as(None) + } + .recoverWith(recoverFromCancelledViaCatsEffect) } + + /** + * According to CE maintainer Daniel Spiewak in Discord, there's + * a race condition in the CE runtime that means sometimes it will + * see the future as completed (with the `CancelledViaCatsEffect` + * exception) before it transitions into the canceled state. This + * `recoverWith` should prevent that from happening. + */ + private final def recoverFromCancelledViaCatsEffect[A](implicit F: Async[F]): PartialFunction[Throwable, F[A]] = { + case CancelledViaCatsEffect => + Async[F].canceled >> Async[F].never + } } diff --git a/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala b/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala new file mode 100644 index 000000000..66618711e --- /dev/null +++ b/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala @@ -0,0 +1,113 @@ +package com.dwolla.util.async + +import cats.effect.* +import cats.effect.std.* +import cats.syntax.all.* +import com.dwolla.util.async.twitter.{CancelledViaCatsEffect, liftFuture} +import com.twitter.util.{Duration as _, *} +import munit.{CatsEffectSuite, ScalaCheckEffectSuite} +import org.scalacheck.{Prop, Test} +import org.scalacheck.effect.PropF + +import java.util.concurrent.CancellationException +import scala.concurrent.duration.* + +class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectSuite { + override def munitIOTimeout: Duration = 1.minute + + override protected def scalaCheckTestParameters: Test.Parameters = + super.scalaCheckTestParameters.withMinSuccessfulTests(100000) + + test("lift a Twitter Future into IO") { + PropF.forAllF { (i: Int) => + for { + promise <- IO(Promise[Int]()) + (x, _) <- liftFuture[IO](IO(promise)).both(IO(promise.setValue(i))) + } yield { + assertEquals(x, i) + } + } + } + + test("cancelling a running Twitter Future lifted into IO should interrupt the underlying Twitter Future") { + for { + promise <- IO(Promise[Int]()) + startedLatch <- CountDownLatch[IO](1) + fiber <- IO.uncancelable { poll => // we want only the Future to be cancellable + poll(liftFuture[IO](startedLatch.release.as(promise))).start + } + _ <- startedLatch.await + _ <- fiber.cancel + } yield { + assert(promise.isInterrupted.isDefined) + } + } + + test("a running Twitter Future lifted into IO can be completed or cancelled") { + PropF.forAllF { (i: Option[Int]) => + (Supervisor[IO](await = true), Dispatcher.parallel[IO](await = true)) + .tupled + .use { case (supervisor, dispatcher) => + for { + capturedInterruptionThrowable <- Deferred[IO, Throwable] + twitterPromise <- IO(new Promise[Option[Int]]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable)) + startedLatch <- CountDownLatch[IO](1) + promiseFiber <- IO.uncancelable { poll => // we want only the Future to be cancellable + supervisor.supervise(poll(liftFuture[IO](startedLatch.release.as(twitterPromise)))) + } + _ <- startedLatch.await + _ <- completeOrCancel(i, twitterPromise, promiseFiber) + cancelledRef <- Ref[IO].of(false) + outcome <- promiseFiber.joinWith(cancelledRef.set(true).as(None)) + wasCancelled <- cancelledRef.get + + expectCancellation = i.isEmpty + _ <- interceptMessageIO[CancellationException]("Cancelled via cats-effect") { + capturedInterruptionThrowable + .get + .timeout(10.millis) + .map(_.asLeft) + .rethrow // interceptMessageIO works by throwing an exception, so we need to rethrow it to get the message + } + .whenA(expectCancellation) + } yield { + assertEquals(outcome, i) + assertEquals(wasCancelled, i.as(false).getOrElse(true)) + assertEquals(Option(CancelledViaCatsEffect).filter(_ => expectCancellation), twitterPromise.isInterrupted) + } + } + } + } + + // just here to make sure we understand how Twitter Future / Promise handles interruption + test("the Twitter Future cancellation protocol") { + Prop.forAll { (throwable: Throwable) => + val promise = Promise[Int]() + + promise.raise(throwable) + + assertEquals(promise.isInterrupted, throwable.some) + } + } + + private def captureThrowableOnInterruption[F[_] : Sync, A](dispatcher: Dispatcher[F], + capture: Deferred[F, Throwable]) + (p: Promise[A]): F[Unit] = + Sync[F].delay { + p.setInterruptHandler { case ex => + dispatcher.unsafeRunSync(capture.complete(ex).void) + } + } + + private def completeOrCancel[F[_] : Sync, A](maybeA: Option[A], + promise: Promise[Option[A]], + fiber: Fiber[F, Throwable, Option[A]]): F[Unit] = + maybeA match { + case Some(a) => Sync[F].delay { + promise.setValue(a.some) + } + case None => + fiber.cancel + } + +} From 6d48a3d90342477c7368843524818daca8229a2a Mon Sep 17 00:00:00 2001 From: Brian Holt Date: Fri, 16 Jan 2026 19:04:36 -0600 Subject: [PATCH 2/2] update TwitterFutureAsyncMapKTests to also cover completing promise with failure --- project/AsyncUtilsBuildPlugin.scala | 1 + .../twitter/TwitterFutureAsyncMapKTests.scala | 109 +++++++++++------- 2 files changed, 70 insertions(+), 40 deletions(-) diff --git a/project/AsyncUtilsBuildPlugin.scala b/project/AsyncUtilsBuildPlugin.scala index 48d7e897c..f132ee301 100644 --- a/project/AsyncUtilsBuildPlugin.scala +++ b/project/AsyncUtilsBuildPlugin.scala @@ -157,6 +157,7 @@ object AsyncUtilsBuildPlugin extends AutoPlugin { libraryDependencies ++= { Seq( "org.typelevel" %% "cats-effect" % CatsEffect3V, + "org.typelevel" %% "cats-effect-testkit" % CatsEffect3V, "com.twitter" %% "util-core" % v, "org.scalameta" %% "munit" % "1.2.1" % Test, "org.typelevel" %% "munit-cats-effect" % "2.1.0" % Test, diff --git a/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala b/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala index 66618711e..84be1cd7f 100644 --- a/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala +++ b/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala @@ -1,16 +1,21 @@ package com.dwolla.util.async +import cats.* import cats.effect.* import cats.effect.std.* +import cats.effect.testkit.TestControl import cats.syntax.all.* +import cats.effect.syntax.all.* import com.dwolla.util.async.twitter.{CancelledViaCatsEffect, liftFuture} import com.twitter.util.{Duration as _, *} -import munit.{CatsEffectSuite, ScalaCheckEffectSuite} -import org.scalacheck.{Prop, Test} +import munit.{AnyFixture, CatsEffectSuite, ScalaCheckEffectSuite} +import org.scalacheck.Arbitrary.arbitrary import org.scalacheck.effect.PropF +import org.scalacheck.{Arbitrary, Gen, Prop, Test} import java.util.concurrent.CancellationException import scala.concurrent.duration.* +import scala.util.control.NoStackTrace class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectSuite { override def munitIOTimeout: Duration = 1.minute @@ -43,39 +48,46 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS } } - test("a running Twitter Future lifted into IO can be completed or cancelled") { - PropF.forAllF { (i: Option[Int]) => - (Supervisor[IO](await = true), Dispatcher.parallel[IO](await = true)) - .tupled - .use { case (supervisor, dispatcher) => - for { - capturedInterruptionThrowable <- Deferred[IO, Throwable] - twitterPromise <- IO(new Promise[Option[Int]]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable)) - startedLatch <- CountDownLatch[IO](1) - promiseFiber <- IO.uncancelable { poll => // we want only the Future to be cancellable - supervisor.supervise(poll(liftFuture[IO](startedLatch.release.as(twitterPromise)))) - } - _ <- startedLatch.await - _ <- completeOrCancel(i, twitterPromise, promiseFiber) - cancelledRef <- Ref[IO].of(false) - outcome <- promiseFiber.joinWith(cancelledRef.set(true).as(None)) - wasCancelled <- cancelledRef.get - - expectCancellation = i.isEmpty - _ <- interceptMessageIO[CancellationException]("Cancelled via cats-effect") { - capturedInterruptionThrowable - .get - .timeout(10.millis) - .map(_.asLeft) - .rethrow // interceptMessageIO works by throwing an exception, so we need to rethrow it to get the message - } - .whenA(expectCancellation) - } yield { - assertEquals(outcome, i) - assertEquals(wasCancelled, i.as(false).getOrElse(true)) - assertEquals(Option(CancelledViaCatsEffect).filter(_ => expectCancellation), twitterPromise.isInterrupted) + private val supervisorAndDispatcher = ResourceTestLocalFixture("supervisorAndDispatcher", + Supervisor[IO](await = true).product(Dispatcher.sequential[IO](await = true)) + ) + + override def munitFixtures: Seq[AnyFixture[?]] = super.munitFixtures ++ Seq(supervisorAndDispatcher) + + test("a running Twitter Future lifted into IO can be completed (as success or failure) or cancelled") { + PropF.forAllF { (i: Outcome[IO, Throwable, Int]) => + val (supervisor, dispatcher) = supervisorAndDispatcher() + + TestControl.executeEmbed { + for { + expectedResult <- i.embed(CancelledViaCatsEffect.raiseError[IO, Int]).attempt + capturedInterruptionThrowable <- Deferred[IO, Throwable] + twitterPromise <- IO(new Promise[Int]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable)) + startedLatch <- CountDownLatch[IO](1) + promiseFiber <- IO.uncancelable { poll => // we want only the Future to be cancellable + supervisor.supervise(poll(liftFuture[IO](startedLatch.release.as(twitterPromise)))) + } + _ <- startedLatch.await + + (outcome, _) <- promiseFiber.join.both(completeOrCancel(i, twitterPromise, promiseFiber)) + + outcomeEmittedValue <- outcome.embed(CancelledViaCatsEffect.raiseError[IO, Int]).attempt + + expectCancellation = i.isCanceled + _ <- interceptMessageIO[CancellationException]("Cancelled via cats-effect") { + capturedInterruptionThrowable + .get + .timeout(10.millis) + .map(_.asLeft) + .rethrow // interceptMessageIO works by throwing an exception, so we need to rethrow it to get the message } + .whenA(expectCancellation) + } yield { + assertEquals(outcomeEmittedValue, expectedResult) + assertEquals(outcome.isCanceled, i.isCanceled) + assertEquals(Option(CancelledViaCatsEffect).filter(_ => expectCancellation), twitterPromise.isInterrupted) } + } } } @@ -90,6 +102,14 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS } } + private def genOutcome[F[_] : Applicative, A: Arbitrary]: Gen[Outcome[F, Throwable, A]] = + Gen.oneOf( + arbitrary[A].map(_.pure[F]).map(Outcome.succeeded[F, Throwable, A]), + Gen.const(new RuntimeException("arbitrary exception") with NoStackTrace).map(Outcome.errored[F, Throwable, A]), + Gen.const(Outcome.canceled[F, Throwable, A]), + ) + private implicit def arbOutcome[F[_] : Applicative, A: Arbitrary]: Arbitrary[Outcome[F, Throwable, A]] = Arbitrary(genOutcome) + private def captureThrowableOnInterruption[F[_] : Sync, A](dispatcher: Dispatcher[F], capture: Deferred[F, Throwable]) (p: Promise[A]): F[Unit] = @@ -99,14 +119,23 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS } } - private def completeOrCancel[F[_] : Sync, A](maybeA: Option[A], - promise: Promise[Option[A]], - fiber: Fiber[F, Throwable, Option[A]]): F[Unit] = + private def completeOrCancel[F[_] : Async, A](maybeA: Outcome[F, Throwable, A], + promise: Promise[A], + fiber: Fiber[F, Throwable, A]): F[Unit] = maybeA match { - case Some(a) => Sync[F].delay { - promise.setValue(a.some) - } - case None => + case Outcome.Succeeded(fa) => + fa.flatMap(a => Sync[F].delay(promise.setValue(a))).void + + case Outcome.Errored(ex) => + // If the fiber is in the background (i.e. not joined) when it completes with an exception, + // the IO runtime will print its stacktrace to stderr. We always plan to `join` the fiber + // our tests are complete, so this feels like a false error report. + + // To work around the issue, we delay the completion of the promise to make sure the fiber + // is joined before the promise is completed with the exception. + Sync[F].delay(promise.setException(ex)).delayBy(10.millis) + + case Outcome.Canceled() => fiber.cancel }