Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ public final class arrow/fx/coroutines/ExitCase$Failure : arrow/fx/coroutines/Ex
public fun toString ()Ljava/lang/String;
}

public final class arrow/fx/coroutines/FailureValue {
public static final field INSTANCE Larrow/fx/coroutines/FailureValue;
public final fun bindNel (Larrow/core/raise/RaiseAccumulate;Ljava/lang/Object;)Ljava/lang/Object;
public final fun failureValue (Ljava/lang/Object;)Ljava/lang/Object;
public final fun mightFail (Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/FlowExtensions {
public static final fun fixedRate (JZLkotlin/jvm/functions/Function0;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun fixedRate$default (JZLkotlin/jvm/functions/Function0;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package arrow.fx.coroutines
import arrow.core.raise.RaiseAccumulate
import arrow.core.Either
import arrow.core.NonEmptyList
import arrow.core.flattenOrAccumulate
import arrow.core.mapOrAccumulate
import arrow.core.raise.Raise
import arrow.core.raise.either
import arrow.fx.coroutines.FailureValue.bindNel
import arrow.fx.coroutines.FailureValue.mightFail
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
Expand Down Expand Up @@ -59,16 +60,13 @@ public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
transform: suspend ScopedRaiseAccumulate<Error>.(A) -> B
): Either<Error, List<B>> =
coroutineScope {
val semaphore = Semaphore(concurrency)
map {
async(context) {
either {
semaphore.withPermit {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}
parMap(context, concurrency) {
mightFail {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}.awaitAll().flattenOrAccumulate(combine)
}.mapOrAccumulate(combine) { maybeFailure ->
bindNel<Error, B>(maybeFailure)
}
}

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
Expand All @@ -77,13 +75,13 @@ public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
transform: suspend ScopedRaiseAccumulate<Error>.(A) -> B
): Either<Error, List<B>> =
coroutineScope {
map {
async(context) {
either {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
parMap(context) {
mightFail {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}.awaitAll().flattenOrAccumulate(combine)
}.mapOrAccumulate(combine) { maybeFailure ->
bindNel<Error, B>(maybeFailure)
}
}

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
Expand All @@ -92,28 +90,25 @@ public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
transform: suspend ScopedRaiseAccumulate<Error>.(A) -> B
): Either<NonEmptyList<Error>, List<B>> =
coroutineScope {
val semaphore = Semaphore(concurrency)
map {
async(context) {
either {
semaphore.withPermit {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}
parMap(context, concurrency) {
mightFail {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}.awaitAll().flattenOrAccumulate()
}.mapOrAccumulate { maybeFailure ->
bindNel<Error, B>(maybeFailure)
}
}

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
context: CoroutineContext = EmptyCoroutineContext,
transform: suspend ScopedRaiseAccumulate<Error>.(A) -> B
): Either<NonEmptyList<Error>, List<B>> =
coroutineScope {
map {
async(context) {
either {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
parMap(context) {
mightFail {
transform(ScopedRaiseAccumulate(this, this@coroutineScope), it)
}
}.awaitAll().flattenOrAccumulate()
}.mapOrAccumulate { maybeFailure ->
bindNel<Error, B>(maybeFailure)
}
}
Loading