Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/breezy-mugs-raise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fix potential leak for StreamSender caused by exceptions
5 changes: 5 additions & 0 deletions .changeset/eight-numbers-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Rethrow cancellation exceptions for coroutines
3 changes: 0 additions & 3 deletions .github/workflows/android.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ jobs:
- name: Grant execute permission for gradlew
run: chmod +x gradlew

- name: Gradle clean
run: ./gradlew clean

- name: Spotless check
if: github.event_name == 'pull_request'
run: |
Expand Down
25 changes: 25 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ buildscript {

plugins {
id("io.github.gradle-nexus.publish-plugin") version "2.0.0"
alias(libs.plugins.detekt) apply false
}

subprojects {
Expand All @@ -32,6 +33,8 @@ subprojects {
return
}

def isDetektRulesModule = project.name == "livekit-detekt-rules"

apply plugin: "com.diffplug.spotless"
spotless {
// optional: limit format enforcement to just the files changed by this feature branch
Expand Down Expand Up @@ -75,6 +78,28 @@ subprojects {
}
}

if (!isDetektRulesModule) {
apply plugin: "io.gitlab.arturbosch.detekt"
detekt {
parallel = true
buildUponDefaultConfig = true
allRules = false
config.from(files("$rootDir/config/detekt/detekt.yml"))
reports {
html.required.set(true)
md.required.set(true)
}
}

dependencies {
detektPlugins project(":livekit-detekt-rules")
}

tasks.withType(io.gitlab.arturbosch.detekt.Detekt).configureEach {
pluginClasspath.setFrom(configurations.detektPlugins)
}
}

// From Gradle 8 onwards, Kapt no longer automatically picks up jvmTarget
// from normal KotlinOptions. Must be manually set.
// JvmToolchain should not be used since it changes the actual JDK used.
Expand Down
23 changes: 23 additions & 0 deletions config/detekt/detekt.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Detekt: smell-oriented checks only. Formatting and naming conventions are handled by Spotless/ktlint.
config:
validation: true
warningsAsErrors: true
excludes: ''

style:
active: false

naming:
active: false

coroutines:
SuspendFunSwallowedCancellation:
active: true

livekit-rules:
NoWithTimeout:
active: true

exceptions:
TooGenericExceptionCaught:
active: false
14 changes: 9 additions & 5 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ android.enableJetifier=false
kotlin.code.style=official

org.gradle.caching=true
org.gradle.configuration-cache=true
org.gradle.configureondemand=true
org.gradle.daemon=true

kotlin.incremental=true
kapt.incremental.apt=true
kapt.use.worker.api=true


###############################################################

GROUP=io.livekit
Expand Down Expand Up @@ -62,8 +71,3 @@ RELEASE_SIGNING_ENABLED=true
livekitUrl=
livekitApiKey=
livekitApiSecret=

org.gradle.configuration-cache=true
kotlin.incremental=true
org.gradle.configureondemand=true
org.gradle.daemon=true
9 changes: 9 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ androidx-lifecycle = "2.8.0"
audioswitch = "70efa204dda4f468f989b4b3e1b5ecb6ffe56ceb"
autoService = '1.0.1'
coroutines = "1.6.0"
detekt = "1.23.8"
dagger = "2.46"
groupie = "2.9.0"
junit-lib = "4.13.2"
Expand All @@ -29,6 +30,8 @@ material = "1.12.0"
viewpager2 = "1.0.0"
noise = "2.0.0"
lifecycleProcess = "2.8.7"
agp = "8.7.2"
kotlin = "1.9.25"

[libraries]
android-jain-sip-ri = { module = "javax.sip:android-jain-sip-ri", version.ref = "androidJainSipRi" }
Expand Down Expand Up @@ -109,5 +112,11 @@ androidx-ui-test-junit4 = { group = "androidx.compose.ui", name = "ui-test-junit
androidx-material3 = { group = "androidx.compose.material3", name = "material3" }
lifecycle-process = { group = "androidx.lifecycle", name = "lifecycle-process", version.ref = "lifecycleProcess" }

detekt-api = { module = "io.gitlab.arturbosch.detekt:detekt-api", version.ref = "detekt" }
detekt-test = { module = "io.gitlab.arturbosch.detekt:detekt-test", version.ref = "detekt" }

[plugins]
detekt = { id = "io.gitlab.arturbosch.detekt", version.ref = "detekt" }
android-library = { id = "com.android.library", version.ref = "agp" }
kotlin-android = { id = "org.jetbrains.kotlin.android", version.ref = "kotlin" }

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,9 +24,11 @@ import io.livekit.android.events.collect
import io.livekit.android.room.ConnectionState
import io.livekit.android.room.Room
import io.livekit.android.room.datastream.StreamBytesOptions
import io.livekit.android.room.datastream.outgoing.useStreamSender
import io.livekit.android.room.participant.Participant
import io.livekit.android.util.LKLog
import io.livekit.android.util.flow
import io.livekit.android.util.rethrowIfCancellationSignal
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
Expand Down Expand Up @@ -150,14 +152,15 @@ internal constructor(timeout: Duration) : AudioTrackSink {
),
)

try {
val result = sender.write(audioData)
val sendResult = useStreamSender(sender) {
val result = write(audioData)
if (result.isFailure) {
result.exceptionOrNull()?.let { throw it }
}
sender.close()
} catch (e: Exception) {
sender.close(e.localizedMessage)
close()
}
if (sendResult.isFailure) {
return
}

val samples = audioData.size / (numberOfChannels * bitsPerSample / 8)
Expand Down Expand Up @@ -257,6 +260,7 @@ suspend fun <T> Room.withPreconnectAudio(
)
sentIdentities.add(identity)
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
LKLog.w(e) { "Error occurred while sending the audio preconnect data." }
onError?.invoke(e)
}
Expand Down Expand Up @@ -295,6 +299,7 @@ suspend fun <T> Room.withPreconnectAudio(
try {
retValue = operation.invoke()
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
cancel()
throw e
}
Expand Down Expand Up @@ -361,6 +366,7 @@ internal suspend fun Room.startPreconnectAudioJob(
)
sentIdentities.add(identity)
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
LKLog.w(e) { "Error occurred while sending the audio preconnect data." }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ import io.livekit.android.util.TTLMap
import io.livekit.android.util.flow
import io.livekit.android.util.flowDelegate
import io.livekit.android.util.nullSafe
import io.livekit.android.util.rethrowIfCancellationSignal
import io.livekit.android.util.withCheckLock
import io.livekit.android.util.withDeadline
import io.livekit.android.webrtc.DataChannelManager
import io.livekit.android.webrtc.DataPacketBuffer
import io.livekit.android.webrtc.DataPacketItem
Expand All @@ -71,7 +73,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.coroutines.yield
import livekit.LivekitModels
Expand Down Expand Up @@ -397,7 +398,7 @@ internal constructor(
}

// Suspend until signal client receives message confirming track publication.
return withTimeout(20.seconds) {
return withDeadline(20.seconds) {
suspendCancellableCoroutine { cont ->
synchronized(pendingTrackResolvers) {
pendingTrackResolvers[cid] = cont
Expand Down Expand Up @@ -540,6 +541,7 @@ internal constructor(
try {
url = regionUrlProvider?.getNextBestRegionUrl() ?: url
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
LKLog.d(e) { "Exception while getting next best region url while reconnecting." }
}
}
Expand Down Expand Up @@ -589,6 +591,7 @@ internal constructor(
listener?.onFullReconnecting()
joinImpl(url!!, token, connectOptions, lastRoomOptions ?: RoomOptions())
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
LKLog.w(e) { "Error during reconnection." }
// reconnect failed, retry.
continue
Expand All @@ -612,6 +615,7 @@ internal constructor(
}
client.onReadyForResponses()
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
LKLog.w(e) { "Error during reconnection." }
// ws reconnect failed, retry.
continue
Expand Down Expand Up @@ -716,6 +720,7 @@ internal constructor(
try {
ensurePublisherConnected(dataPacket.kind)
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
return Result.failure(e)
}

Expand Down Expand Up @@ -769,6 +774,7 @@ internal constructor(

channel.send(buf)
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
return Result.failure(e)
}
return Result.success(Unit)
Expand Down Expand Up @@ -802,6 +808,7 @@ internal constructor(
try {
ensurePublisherConnected(kind)
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
return
}
val manager = dataChannelManagerForKind(kind) ?: return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import io.livekit.android.util.LKLog
import io.livekit.android.util.flow
import io.livekit.android.util.flowDelegate
import io.livekit.android.util.invoke
import io.livekit.android.util.rethrowIfCancellationSignal
import io.livekit.android.webrtc.getFilteredStats
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
Expand Down Expand Up @@ -414,6 +415,7 @@ constructor(
connectionWarmer.fetch(url)
}
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
LKLog.e(e) { "Error while preparing connection:" }
}
}
Expand Down Expand Up @@ -498,6 +500,7 @@ constructor(
try {
regionUrlProvider?.fetchRegionSettings()
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
LKLog.w(e) { "could not fetch region settings" }
}
}
Expand All @@ -513,9 +516,7 @@ constructor(
engine.regionUrlProvider = regionUrlProvider
engine.join(connectUrl, token, options, roomOptions)
} catch (e: Exception) {
if (e is CancellationException) {
throw e // rethrow to properly cancel.
}
e.rethrowIfCancellationSignal()

nextUrl = regionUrlProvider?.getNextBestRegionUrl()
if (nextUrl != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,10 @@ package io.livekit.android.room.datastream.outgoing

import androidx.annotation.CheckResult
import io.livekit.android.room.datastream.StreamException
import io.livekit.android.util.LKLog
import io.livekit.android.util.rethrowIfCancellationSignal
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext

abstract class BaseStreamSender<T>(
internal val destination: StreamDestination<T>,
Expand Down Expand Up @@ -61,3 +65,36 @@ interface StreamDestination<T> {
* @suppress
*/
typealias DataChunker<T> = (data: T, chunkSize: Int) -> List<ByteArray>

/**
* Runs [block] with [sender], then ensures [sender] is closed afterwards if it is still open.
*
* On success, [block] should still attempt to close [sender] when the stream is
* finished normally. If it is left open, any exceptions thrown by [sender.close]
* will be ignored.
*/
@CheckResult
suspend inline fun <S : BaseStreamSender<*>, R> useStreamSender(
sender: S,
block: suspend S.() -> R,
): Result<R> {
var abnormalCloseReason: String? = null
try {
return Result.success(sender.block())
} catch (e: Exception) {
abnormalCloseReason = e.localizedMessage
e.rethrowIfCancellationSignal()
return Result.failure(e)
} finally {
if (sender.isOpen) {
try {
withContext(NonCancellable) {
sender.close(abnormalCloseReason)
}
} catch (closeException: Exception) {
// Best-effort cleanup; must not mask pending cancellation or errors.
LKLog.w(closeException) { "Error when closing sender:" }
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@ package io.livekit.android.room.datastream.outgoing

import androidx.annotation.CheckResult
import io.livekit.android.room.datastream.ByteStreamInfo
import io.livekit.android.util.rethrowIfCancellationSignal
import okio.Buffer
import okio.FileSystem
import okio.Path.Companion.toOkioPath
Expand Down Expand Up @@ -108,6 +109,7 @@ suspend fun ByteStreamSender.write(source: Source): Result<Unit> {
Result.success(Unit)
}
} catch (e: Exception) {
e.rethrowIfCancellationSignal()
Result.failure(e)
}
}
Loading
Loading