diff --git a/.changeset/breezy-mugs-raise.md b/.changeset/breezy-mugs-raise.md new file mode 100644 index 000000000..d5d583a35 --- /dev/null +++ b/.changeset/breezy-mugs-raise.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix potential leak for StreamSender caused by exceptions diff --git a/.changeset/eight-numbers-fold.md b/.changeset/eight-numbers-fold.md new file mode 100644 index 000000000..6e77ff674 --- /dev/null +++ b/.changeset/eight-numbers-fold.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Rethrow cancellation exceptions for coroutines diff --git a/.github/workflows/android.yml b/.github/workflows/android.yml index 74fe11ddc..5446f7434 100644 --- a/.github/workflows/android.yml +++ b/.github/workflows/android.yml @@ -49,9 +49,6 @@ jobs: - name: Grant execute permission for gradlew run: chmod +x gradlew - - name: Gradle clean - run: ./gradlew clean - - name: Spotless check if: github.event_name == 'pull_request' run: | diff --git a/build.gradle b/build.gradle index e304f47dd..ba01fff6e 100644 --- a/build.gradle +++ b/build.gradle @@ -24,6 +24,7 @@ buildscript { plugins { id("io.github.gradle-nexus.publish-plugin") version "2.0.0" + alias(libs.plugins.detekt) apply false } subprojects { @@ -32,6 +33,8 @@ subprojects { return } + def isDetektRulesModule = project.name == "livekit-detekt-rules" + apply plugin: "com.diffplug.spotless" spotless { // optional: limit format enforcement to just the files changed by this feature branch @@ -75,6 +78,28 @@ subprojects { } } + if (!isDetektRulesModule) { + apply plugin: "io.gitlab.arturbosch.detekt" + detekt { + parallel = true + buildUponDefaultConfig = true + allRules = false + config.from(files("$rootDir/config/detekt/detekt.yml")) + reports { + html.required.set(true) + md.required.set(true) + } + } + + dependencies { + detektPlugins project(":livekit-detekt-rules") + } + + tasks.withType(io.gitlab.arturbosch.detekt.Detekt).configureEach { + pluginClasspath.setFrom(configurations.detektPlugins) + } + } + // From Gradle 8 onwards, Kapt no longer automatically picks up jvmTarget // from normal KotlinOptions. Must be manually set. // JvmToolchain should not be used since it changes the actual JDK used. diff --git a/config/detekt/detekt.yml b/config/detekt/detekt.yml new file mode 100644 index 000000000..537579a1d --- /dev/null +++ b/config/detekt/detekt.yml @@ -0,0 +1,23 @@ +# Detekt: smell-oriented checks only. Formatting and naming conventions are handled by Spotless/ktlint. +config: + validation: true + warningsAsErrors: true + excludes: '' + +style: + active: false + +naming: + active: false + +coroutines: + SuspendFunSwallowedCancellation: + active: true + +livekit-rules: + NoWithTimeout: + active: true + +exceptions: + TooGenericExceptionCaught: + active: false \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index c32534172..67da82dee 100644 --- a/gradle.properties +++ b/gradle.properties @@ -21,6 +21,15 @@ android.enableJetifier=false kotlin.code.style=official org.gradle.caching=true +org.gradle.configuration-cache=true +org.gradle.configureondemand=true +org.gradle.daemon=true + +kotlin.incremental=true +kapt.incremental.apt=true +kapt.use.worker.api=true + + ############################################################### GROUP=io.livekit @@ -62,8 +71,3 @@ RELEASE_SIGNING_ENABLED=true livekitUrl= livekitApiKey= livekitApiSecret= - -org.gradle.configuration-cache=true -kotlin.incremental=true -org.gradle.configureondemand=true -org.gradle.daemon=true diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2fb829ece..01409e2a3 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -10,6 +10,7 @@ androidx-lifecycle = "2.8.0" audioswitch = "70efa204dda4f468f989b4b3e1b5ecb6ffe56ceb" autoService = '1.0.1' coroutines = "1.6.0" +detekt = "1.23.8" dagger = "2.46" groupie = "2.9.0" junit-lib = "4.13.2" @@ -29,6 +30,8 @@ material = "1.12.0" viewpager2 = "1.0.0" noise = "2.0.0" lifecycleProcess = "2.8.7" +agp = "8.7.2" +kotlin = "1.9.25" [libraries] android-jain-sip-ri = { module = "javax.sip:android-jain-sip-ri", version.ref = "androidJainSipRi" } @@ -109,5 +112,11 @@ androidx-ui-test-junit4 = { group = "androidx.compose.ui", name = "ui-test-junit androidx-material3 = { group = "androidx.compose.material3", name = "material3" } lifecycle-process = { group = "androidx.lifecycle", name = "lifecycle-process", version.ref = "lifecycleProcess" } +detekt-api = { module = "io.gitlab.arturbosch.detekt:detekt-api", version.ref = "detekt" } +detekt-test = { module = "io.gitlab.arturbosch.detekt:detekt-test", version.ref = "detekt" } + [plugins] +detekt = { id = "io.gitlab.arturbosch.detekt", version.ref = "detekt" } +android-library = { id = "com.android.library", version.ref = "agp" } +kotlin-android = { id = "org.jetbrains.kotlin.android", version.ref = "kotlin" } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/audio/PreconnectAudioBuffer.kt b/livekit-android-sdk/src/main/java/io/livekit/android/audio/PreconnectAudioBuffer.kt index 7aa732461..2b357773d 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/audio/PreconnectAudioBuffer.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/audio/PreconnectAudioBuffer.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,9 +24,11 @@ import io.livekit.android.events.collect import io.livekit.android.room.ConnectionState import io.livekit.android.room.Room import io.livekit.android.room.datastream.StreamBytesOptions +import io.livekit.android.room.datastream.outgoing.useStreamSender import io.livekit.android.room.participant.Participant import io.livekit.android.util.LKLog import io.livekit.android.util.flow +import io.livekit.android.util.rethrowIfCancellationSignal import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.cancel @@ -150,14 +152,15 @@ internal constructor(timeout: Duration) : AudioTrackSink { ), ) - try { - val result = sender.write(audioData) + val sendResult = useStreamSender(sender) { + val result = write(audioData) if (result.isFailure) { result.exceptionOrNull()?.let { throw it } } - sender.close() - } catch (e: Exception) { - sender.close(e.localizedMessage) + close() + } + if (sendResult.isFailure) { + return } val samples = audioData.size / (numberOfChannels * bitsPerSample / 8) @@ -257,6 +260,7 @@ suspend fun Room.withPreconnectAudio( ) sentIdentities.add(identity) } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.w(e) { "Error occurred while sending the audio preconnect data." } onError?.invoke(e) } @@ -295,6 +299,7 @@ suspend fun Room.withPreconnectAudio( try { retValue = operation.invoke() } catch (e: Exception) { + e.rethrowIfCancellationSignal() cancel() throw e } @@ -361,6 +366,7 @@ internal suspend fun Room.startPreconnectAudioJob( ) sentIdentities.add(identity) } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.w(e) { "Error occurred while sending the audio preconnect data." } } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index c35761513..ed1f5fbe4 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -47,7 +47,9 @@ import io.livekit.android.util.TTLMap import io.livekit.android.util.flow import io.livekit.android.util.flowDelegate import io.livekit.android.util.nullSafe +import io.livekit.android.util.rethrowIfCancellationSignal import io.livekit.android.util.withCheckLock +import io.livekit.android.util.withDeadline import io.livekit.android.webrtc.DataChannelManager import io.livekit.android.webrtc.DataPacketBuffer import io.livekit.android.webrtc.DataPacketItem @@ -71,7 +73,6 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeoutOrNull import kotlinx.coroutines.yield import livekit.LivekitModels @@ -397,7 +398,7 @@ internal constructor( } // Suspend until signal client receives message confirming track publication. - return withTimeout(20.seconds) { + return withDeadline(20.seconds) { suspendCancellableCoroutine { cont -> synchronized(pendingTrackResolvers) { pendingTrackResolvers[cid] = cont @@ -540,6 +541,7 @@ internal constructor( try { url = regionUrlProvider?.getNextBestRegionUrl() ?: url } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.d(e) { "Exception while getting next best region url while reconnecting." } } } @@ -589,6 +591,7 @@ internal constructor( listener?.onFullReconnecting() joinImpl(url!!, token, connectOptions, lastRoomOptions ?: RoomOptions()) } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.w(e) { "Error during reconnection." } // reconnect failed, retry. continue @@ -612,6 +615,7 @@ internal constructor( } client.onReadyForResponses() } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.w(e) { "Error during reconnection." } // ws reconnect failed, retry. continue @@ -716,6 +720,7 @@ internal constructor( try { ensurePublisherConnected(dataPacket.kind) } catch (e: Exception) { + e.rethrowIfCancellationSignal() return Result.failure(e) } @@ -769,6 +774,7 @@ internal constructor( channel.send(buf) } catch (e: Exception) { + e.rethrowIfCancellationSignal() return Result.failure(e) } return Result.success(Unit) @@ -802,6 +808,7 @@ internal constructor( try { ensurePublisherConnected(kind) } catch (e: Exception) { + e.rethrowIfCancellationSignal() return } val manager = dataChannelManagerForKind(kind) ?: return diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt index 77de0cfb6..c6c28399d 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt @@ -75,6 +75,7 @@ import io.livekit.android.util.LKLog import io.livekit.android.util.flow import io.livekit.android.util.flowDelegate import io.livekit.android.util.invoke +import io.livekit.android.util.rethrowIfCancellationSignal import io.livekit.android.webrtc.getFilteredStats import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineDispatcher @@ -414,6 +415,7 @@ constructor( connectionWarmer.fetch(url) } } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.e(e) { "Error while preparing connection:" } } } @@ -498,6 +500,7 @@ constructor( try { regionUrlProvider?.fetchRegionSettings() } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.w(e) { "could not fetch region settings" } } } @@ -513,9 +516,7 @@ constructor( engine.regionUrlProvider = regionUrlProvider engine.join(connectUrl, token, options, roomOptions) } catch (e: Exception) { - if (e is CancellationException) { - throw e // rethrow to properly cancel. - } + e.rethrowIfCancellationSignal() nextUrl = regionUrlProvider?.getNextBestRegionUrl() if (nextUrl != null) { diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/BaseStreamSender.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/BaseStreamSender.kt index 0fa932eb3..9463bf1f6 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/BaseStreamSender.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/BaseStreamSender.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,10 @@ package io.livekit.android.room.datastream.outgoing import androidx.annotation.CheckResult import io.livekit.android.room.datastream.StreamException +import io.livekit.android.util.LKLog +import io.livekit.android.util.rethrowIfCancellationSignal +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.withContext abstract class BaseStreamSender( internal val destination: StreamDestination, @@ -61,3 +65,36 @@ interface StreamDestination { * @suppress */ typealias DataChunker = (data: T, chunkSize: Int) -> List + +/** + * Runs [block] with [sender], then ensures [sender] is closed afterwards if it is still open. + * + * On success, [block] should still attempt to close [sender] when the stream is + * finished normally. If it is left open, any exceptions thrown by [sender.close] + * will be ignored. + */ +@CheckResult +suspend inline fun , R> useStreamSender( + sender: S, + block: suspend S.() -> R, +): Result { + var abnormalCloseReason: String? = null + try { + return Result.success(sender.block()) + } catch (e: Exception) { + abnormalCloseReason = e.localizedMessage + e.rethrowIfCancellationSignal() + return Result.failure(e) + } finally { + if (sender.isOpen) { + try { + withContext(NonCancellable) { + sender.close(abnormalCloseReason) + } + } catch (closeException: Exception) { + // Best-effort cleanup; must not mask pending cancellation or errors. + LKLog.w(closeException) { "Error when closing sender:" } + } + } + } +} diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/ByteStreamSender.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/ByteStreamSender.kt index 95a7eb6db..a4bae70f6 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/ByteStreamSender.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/ByteStreamSender.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package io.livekit.android.room.datastream.outgoing import androidx.annotation.CheckResult import io.livekit.android.room.datastream.ByteStreamInfo +import io.livekit.android.util.rethrowIfCancellationSignal import okio.Buffer import okio.FileSystem import okio.Path.Companion.toOkioPath @@ -108,6 +109,7 @@ suspend fun ByteStreamSender.write(source: Source): Result { Result.success(Unit) } } catch (e: Exception) { + e.rethrowIfCancellationSignal() Result.failure(e) } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/OutgoingDataStreamManager.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/OutgoingDataStreamManager.kt index dcdf557b5..6058efd67 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/OutgoingDataStreamManager.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/OutgoingDataStreamManager.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,20 +62,13 @@ interface OutgoingDataStreamManager { */ @CheckResult suspend fun sendText(text: String, options: StreamTextOptions = StreamTextOptions()): Result { - try { - val sender = streamText(options) - val result = sender.write(text) - + return useStreamSender(streamText(options)) { + val result = write(text) if (result.isFailure) { - val exception = result.exceptionOrNull() ?: Exception("Unknown error.") - sender.close(exception.message) - return Result.failure(exception) - } else { - sender.close() - return Result.success(sender.info) + throw (result.exceptionOrNull() ?: Exception("Unknown error.")) } - } catch (e: Exception) { - return Result.failure(e) + close() + return@useStreamSender info } } @@ -84,20 +77,13 @@ interface OutgoingDataStreamManager { */ @CheckResult suspend fun sendFile(file: File, options: StreamBytesOptions = StreamBytesOptions()): Result { - try { - val sender = streamBytes(options) - val result = sender.writeFile(file) - + return useStreamSender(streamBytes(options)) { + val result = writeFile(file) if (result.isFailure) { - val exception = result.exceptionOrNull() ?: Exception("Unknown error.") - sender.close(exception.message) - return Result.failure(exception) - } else { - sender.close() - return Result.success(sender.info) + throw (result.exceptionOrNull() ?: Exception("Unknown error.")) } - } catch (e: Exception) { - return Result.failure(e) + close() + return@useStreamSender info } } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/metrics/RTCMetricsManager.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/metrics/RTCMetricsManager.kt index 2f260d26b..306c3b4c1 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/metrics/RTCMetricsManager.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/metrics/RTCMetricsManager.kt @@ -1,5 +1,5 @@ /* - * Copyright 2024-2025 LiveKit, Inc. + * Copyright 2024-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import io.livekit.android.room.RTCEngine import io.livekit.android.room.Room import io.livekit.android.room.participant.Participant import io.livekit.android.util.LKLog +import io.livekit.android.util.rethrowIfCancellationSignal import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.delay @@ -73,6 +74,7 @@ private suspend fun collectPublisherMetrics(room: Room, rtcEngine: RTCEngine) { throw it } } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.i(e) { "Error sending metrics: " } } } @@ -106,6 +108,7 @@ private suspend fun collectSubscriberMetrics(room: Room, rtcEngine: RTCEngine) { throw it } } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.i(e) { "Error sending metrics: " } } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt index 18b8a1b23..8ef031dee 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt @@ -58,6 +58,7 @@ import io.livekit.android.rpc.RpcError import io.livekit.android.util.LKLog import io.livekit.android.util.byteLength import io.livekit.android.util.flow +import io.livekit.android.util.rethrowIfCancellationSignal import io.livekit.android.webrtc.sortVideoCodecPreferences import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Job @@ -735,6 +736,7 @@ internal constructor( ) } catch (e: Exception) { onPublishFailure(TrackException.PublishException("Failed to publish track", e)) + e.rethrowIfCancellationSignal() null } } @@ -1364,6 +1366,7 @@ internal constructor( responsePayload = response } } catch (e: Exception) { + e.rethrowIfCancellationSignal() if (e is RpcError) { responseError = e } else { @@ -1553,6 +1556,7 @@ internal constructor( val trackInfo = publishJob.await() LKLog.d { "published $codec for track ${track.sid}, $trackInfo" } } catch (e: Exception) { + e.rethrowIfCancellationSignal() LKLog.w(e) { "exception when publishing $codec for track ${track.sid}" } } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/token/CustomTokenSource.kt b/livekit-android-sdk/src/main/java/io/livekit/android/token/CustomTokenSource.kt index 44720b572..08e7b3912 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/token/CustomTokenSource.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/token/CustomTokenSource.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,14 @@ package io.livekit.android.token +import io.livekit.android.util.rethrowIfCancellationSignal + internal class CustomTokenSource(val block: suspend (options: TokenRequestOptions) -> Result) : ConfigurableTokenSource { override suspend fun fetch(options: TokenRequestOptions): Result { return try { block(options) } catch (e: Throwable) { + e.rethrowIfCancellationSignal() Result.failure(e) } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/token/EndpointTokenSource.kt b/livekit-android-sdk/src/main/java/io/livekit/android/token/EndpointTokenSource.kt index a3e1e5d55..b68886c0b 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/token/EndpointTokenSource.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/token/EndpointTokenSource.kt @@ -17,6 +17,7 @@ package io.livekit.android.token import io.livekit.android.dagger.globalOkHttpClient +import io.livekit.android.util.rethrowIfCancellationSignal import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.decodeFromString @@ -130,6 +131,7 @@ internal interface EndpointTokenSource : ConfigurableTokenSource { }, ) } catch (e: Exception) { + e.rethrowIfCancellationSignal() continuation.resume(Result.failure(e)) } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/util/CoroutineUtil.kt b/livekit-android-sdk/src/main/java/io/livekit/android/util/CoroutineUtil.kt index dc1e841f5..5c111610e 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/util/CoroutineUtil.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/util/CoroutineUtil.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,17 @@ package io.livekit.android.util -import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.withTimeout +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract +import kotlin.time.Duration internal fun debounce( waitMs: Long = 300L, @@ -36,3 +46,36 @@ internal fun debounce( internal fun ((Unit) -> R).invoke() { this.invoke(Unit) } +class TimeoutException(cause: Exception) : Exception(cause) + +/** + * A replacement for [withTimeout], as it throws a [TimeoutCancellationException], which is + * a subclass of [CancellationException], and will cancel a coroutine entirely. + * + * This catches the [TimeoutCancellationException], and rethrows a [TimeoutException]. + * + * See the following for context: + * * [https://github.com/Kotlin/kotlinx.coroutines/issues/1374](https://github.com/Kotlin/kotlinx.coroutines/issues/1374) + * * [https://github.com/Kotlin/kotlinx.coroutines/pull/4356](https://github.com/Kotlin/kotlinx.coroutines/pull/4356) + * @throws TimeoutException if the [timeout] is exceeded. + */ + +@Throws(TimeoutException::class) +@OptIn(ExperimentalContracts::class) +@Suppress("NoWithTimeout") // Only allowed call site: maps TimeoutCancellationException to TimeoutException (see KDoc). +internal suspend fun withDeadline(timeout: Duration, block: suspend () -> T): T { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } + try { + return withTimeout(timeout) { block() } + } catch (e: TimeoutCancellationException) { + throw TimeoutException(e) + } +} + +fun Throwable.rethrowIfCancellationSignal() { + if (this is CancellationException) { + throw this + } +} diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt index 79f4fa0d9..00b5ca035 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt @@ -821,11 +821,15 @@ class LocalParticipantMockE2ETest : MockE2ETest() { success = room.localParticipant.publishVideoTrack(createLocalTrack()) } catch (e: TrackException.PublishException) { didThrow = true + } catch (e: Exception) { + didThrow = true } } coroutineRule.dispatcher.scheduler.advanceUntilIdle() - assertTrue(!didThrow && success == false) + assertTrue(!didThrow) + assertTrue(success != null) + assertFalse(success!!) } @Test diff --git a/livekit-detekt-rules/.gitignore b/livekit-detekt-rules/.gitignore new file mode 100644 index 000000000..348c102af --- /dev/null +++ b/livekit-detekt-rules/.gitignore @@ -0,0 +1,2 @@ +/build +/bin diff --git a/livekit-detekt-rules/README.md b/livekit-detekt-rules/README.md new file mode 100644 index 000000000..2d7b1e03f --- /dev/null +++ b/livekit-detekt-rules/README.md @@ -0,0 +1,3 @@ +# livekit-detekt-rules + +Custom detekt rules for the LiveKit Android SDK codebase. diff --git a/livekit-detekt-rules/build.gradle b/livekit-detekt-rules/build.gradle new file mode 100644 index 000000000..56371a330 --- /dev/null +++ b/livekit-detekt-rules/build.gradle @@ -0,0 +1,25 @@ +plugins { + id 'java-library' + id 'kotlin' +} + +java { + sourceCompatibility = java_version + targetCompatibility = java_version +} + +tasks.withType(org.jetbrains.kotlin.gradle.tasks.BaseKotlinCompile).configureEach { + kotlinOptions { + jvmTarget = java_version + } +} + +dependencies { + compileOnly libs.detekt.api + testImplementation libs.detekt.test + testImplementation libs.junit +} + +test { + useJUnit() +} diff --git a/livekit-detekt-rules/src/main/java/io/livekit/detekt/LivekitRuleSetProvider.kt b/livekit-detekt-rules/src/main/java/io/livekit/detekt/LivekitRuleSetProvider.kt new file mode 100644 index 000000000..fc5fa5a9c --- /dev/null +++ b/livekit-detekt-rules/src/main/java/io/livekit/detekt/LivekitRuleSetProvider.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2026 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.detekt + +import io.gitlab.arturbosch.detekt.api.Config +import io.gitlab.arturbosch.detekt.api.RuleSet +import io.gitlab.arturbosch.detekt.api.RuleSetProvider + +class LivekitRuleSetProvider : RuleSetProvider { + override val ruleSetId: String = "livekit-rules" + + override fun instance(config: Config): RuleSet { + return RuleSet( + ruleSetId, + listOf( + NoWithTimeout(config), + ), + ) + } +} diff --git a/livekit-detekt-rules/src/main/java/io/livekit/detekt/NoWithTimeout.kt b/livekit-detekt-rules/src/main/java/io/livekit/detekt/NoWithTimeout.kt new file mode 100644 index 000000000..162a651b0 --- /dev/null +++ b/livekit-detekt-rules/src/main/java/io/livekit/detekt/NoWithTimeout.kt @@ -0,0 +1,106 @@ +/* + * Copyright 2026 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.detekt + +import io.gitlab.arturbosch.detekt.api.CodeSmell +import io.gitlab.arturbosch.detekt.api.Config +import io.gitlab.arturbosch.detekt.api.Debt +import io.gitlab.arturbosch.detekt.api.Entity +import io.gitlab.arturbosch.detekt.api.Issue +import io.gitlab.arturbosch.detekt.api.Rule +import io.gitlab.arturbosch.detekt.api.Severity +import org.jetbrains.kotlin.psi.KtCallExpression +import org.jetbrains.kotlin.psi.KtDotQualifiedExpression +import org.jetbrains.kotlin.psi.KtFile +import org.jetbrains.kotlin.psi.KtImportDirective +import org.jetbrains.kotlin.psi.KtNameReferenceExpression + +/** + * Detects kotlinx.coroutines `withTimeout` calls. + */ +class NoWithTimeout(config: Config) : Rule(config) { + override val issue = Issue( + "NoWithTimeout", + Severity.Defect, + "withTimeout cancels the whole coroutine on timeout; use withDeadline or withTimeoutOrNull instead.", + Debt.FIVE_MINS, + ) + + override fun visitCallExpression(expression: KtCallExpression) { + super.visitCallExpression(expression) + if (!isKotlinxWithTimeout(expression)) return + report( + CodeSmell( + issue, + Entity.from(expression), + "Do not use kotlinx.coroutines.withTimeout.", + ), + ) + } + + private fun isKotlinxWithTimeout(expression: KtCallExpression): Boolean { + val callee = expression.calleeExpression ?: return false + return when (callee) { + is KtNameReferenceExpression -> { + if (callee.getReferencedName() != "withTimeout") return false + expression.containingKtFile.importsKotlinxWithTimeout() + } + is KtDotQualifiedExpression -> callee.isKotlinxCoroutinesWithTimeout() + else -> false + } + } + + private fun KtFile.importsKotlinxWithTimeout(): Boolean { + if (importDirectives.any { it.importsKotlinxWithTimeout() }) { + return true + } + // Fallback for environments where import PSI is incomplete. + return text.lineSequence() + .map { it.substringBefore("//").trim() } + .any { line -> + line == "import kotlinx.coroutines.withTimeout" || + line.startsWith("import kotlinx.coroutines.withTimeout as ") || + line == "import kotlinx.coroutines.*" + } + } + + private fun KtImportDirective.importsKotlinxWithTimeout(): Boolean { + if (isAllUnder && importedFqName?.asString() == "kotlinx.coroutines") { + return true + } + if (!isAllUnder && importedFqName?.asString() == "kotlinx.coroutines.withTimeout") { + return true + } + val path = importPath?.pathStr + return path == "kotlinx.coroutines.withTimeout" || path == "kotlinx.coroutines.*" + } + + private fun KtDotQualifiedExpression.isKotlinxCoroutinesWithTimeout(): Boolean { + val selector = selectorExpression as? KtNameReferenceExpression ?: return false + if (selector.getReferencedName() != "withTimeout") return false + if (text.removeSurrounding("`") == "kotlinx.coroutines.withTimeout") return true + // Nested: kotlinx.coroutines.withTimeout(…) — not a single DotQualified string in all PSI versions. + val recv = receiverExpression + if (recv is KtDotQualifiedExpression) { + val coroutines = recv.selectorExpression as? KtNameReferenceExpression + val kotlinx = recv.receiverExpression as? KtNameReferenceExpression + return kotlinx?.getReferencedName() == "kotlinx" && + coroutines?.getReferencedName() == "coroutines" + } + return false + } +} diff --git a/livekit-detekt-rules/src/main/resources/META-INF/services/io.gitlab.arturbosch.detekt.api.RuleSetProvider b/livekit-detekt-rules/src/main/resources/META-INF/services/io.gitlab.arturbosch.detekt.api.RuleSetProvider new file mode 100644 index 000000000..349ab0c89 --- /dev/null +++ b/livekit-detekt-rules/src/main/resources/META-INF/services/io.gitlab.arturbosch.detekt.api.RuleSetProvider @@ -0,0 +1 @@ +io.livekit.detekt.LivekitRuleSetProvider diff --git a/livekit-detekt-rules/src/test/java/io/livekit/detekt/NoWithTimeoutTest.kt b/livekit-detekt-rules/src/test/java/io/livekit/detekt/NoWithTimeoutTest.kt new file mode 100644 index 000000000..5a6ace630 --- /dev/null +++ b/livekit-detekt-rules/src/test/java/io/livekit/detekt/NoWithTimeoutTest.kt @@ -0,0 +1,122 @@ +/* + * Copyright 2026 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.detekt + +import io.gitlab.arturbosch.detekt.api.Config +import io.gitlab.arturbosch.detekt.test.lint +import org.junit.Assert.assertEquals +import org.junit.Test + +class NoWithTimeoutTest { + + private val subject = NoWithTimeout(Config.empty) + + @Test + fun `reports withTimeout with explicit import`() { + val code = """ + import kotlinx.coroutines.withTimeout + + suspend fun f() { + withTimeout(1L) { } + } + """.trimIndent() + val findings = subject.lint(code) + assertEquals(1, findings.size) + } + + @Test + fun `reports withTimeout with Int millis and delay in block like CoroutineUtil`() { + val code = """ + import kotlinx.coroutines.delay + import kotlinx.coroutines.withTimeout + + suspend fun testTimeout() { + withTimeout(1000) { + delay(2000) + } + } + """.trimIndent() + val findings = subject.lint(code) + assertEquals(1, findings.size) + } + + @Test + fun `source text fallback matches import when PSI import list is typical`() { + // Same as CoroutineUtil: explicit import line + withTimeout(1000) { delay } + val code = """ + package io.example + import kotlinx.coroutines.delay + import kotlinx.coroutines.withTimeout + + suspend fun testTimeout() { + withTimeout(1000) { delay(2000) } + } + """.trimIndent() + assertEquals(1, subject.lint(code).size) + } + + @Test + fun `reports withTimeout with star import`() { + val code = """ + import kotlinx.coroutines.* + + suspend fun f() { + withTimeout(1L) { } + } + """.trimIndent() + val findings = subject.lint(code) + assertEquals(1, findings.size) + } + + @Test + fun `reports each withTimeout in the same file`() { + val code = """ + import kotlinx.coroutines.withTimeout + + suspend fun a() { withTimeout(1L) { } } + suspend fun b() { withTimeout(2L) { } } + """.trimIndent() + val findings = subject.lint(code) + assertEquals(2, findings.size) + } + + @Test + fun `ignores withTimeoutOrNull`() { + val code = """ + import kotlinx.coroutines.withTimeoutOrNull + + suspend fun f() { + withTimeoutOrNull(1L) { } + } + """.trimIndent() + val findings = subject.lint(code) + assertEquals(0, findings.size) + } + + @Test + fun `ignores unqualified withTimeout without imports`() { + val code = """ + class C { + fun f() { + withTimeout(1L) { } + } + } + """.trimIndent() + val findings = subject.lint(code) + assertEquals(0, findings.size) + } +} diff --git a/settings.gradle b/settings.gradle index 6af64cf19..b5882cbb9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,6 +28,7 @@ include ':sample-app-basic' include ':sample-app-record-local' include ':examples:virtual-background' include ':livekit-android-test' +include ':livekit-detekt-rules' include ':livekit-android-camerax' include ':examples:screenshare-audio' include ':livekit-android-track-processors'