From 72ecf3732accc89fe140560efab081f073caced4 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Wed, 18 Feb 2026 10:04:44 +0100 Subject: [PATCH 1/3] feat(core): add stateScope for StateFlows that survive leave() Introduce a separate stateScope that lives for the entire Call object lifetime and is NOT cancelled on leave(). This allows StateFlows using stateIn() to survive leave() and continue receiving coordinator updates. The existing scope behavior is preserved - it's still cancelled on leave() to clean up RTC-related coroutines. Only the stateIn() flows in CallState, CallStats, and ParticipantState now use stateScope. This approach (Rahul's Approach 3) provides structural safety: - Existing cancellation behavior preserved (scope cancelled on leave) - If a stateIn is missed, it dies on leave - safe default, no zombies - StateFlows that need updates after leave() explicitly use stateScope --- .../api/stream-video-android-core.api | 2 +- .../io/getstream/video/android/core/Call.kt | 9 +++++++- .../getstream/video/android/core/CallState.kt | 22 +++++++++++++------ 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/stream-video-android-core/api/stream-video-android-core.api b/stream-video-android-core/api/stream-video-android-core.api index 8f0d9ab1a9..2c901a4f7f 100644 --- a/stream-video-android-core/api/stream-video-android-core.api +++ b/stream-video-android-core/api/stream-video-android-core.api @@ -8687,7 +8687,7 @@ public final class io/getstream/video/android/core/CallKt { } public final class io/getstream/video/android/core/CallState { - public fun (Lio/getstream/video/android/core/StreamVideo;Lio/getstream/video/android/core/Call;Lio/getstream/video/android/model/User;Lkotlinx/coroutines/CoroutineScope;)V + public fun (Lio/getstream/video/android/core/StreamVideo;Lio/getstream/video/android/core/Call;Lio/getstream/video/android/model/User;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineScope;)V public final fun clearParticipants ()V public final fun getAcceptedBy ()Lkotlinx/coroutines/flow/StateFlow; public final fun getActiveSpeakers ()Lkotlinx/coroutines/flow/StateFlow; diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt index 4b0ed353ae..4d06f753b0 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt @@ -166,8 +166,15 @@ public class Call( internal val scope = CoroutineScope(clientImpl.scope.coroutineContext + supervisorJob) + /** + * A long-lived scope for StateFlows that need to survive leave() and receive coordinator updates. + * This scope lives for the entire lifetime of the Call object and is NOT cancelled on leave(). + * Used for stateIn() operators in CallState, CallStats, and ParticipantState. + */ + internal val stateScope = CoroutineScope(clientImpl.scope.coroutineContext + SupervisorJob()) + /** The call state contains all state such as the participant list, reactions etc */ - val state = CallState(client, this, user, scope) + val state = CallState(client, this, user, scope, stateScope) private val network by lazy { clientImpl.coordinatorConnectionModule.networkStateProvider } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt index d6ec114a75..0b839647f3 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt @@ -222,6 +222,12 @@ public class CallState( private val user: User, @InternalStreamVideoApi val scope: CoroutineScope, + /** + * A long-lived scope for StateFlows that need to survive leave() and receive coordinator updates. + * This scope is NOT cancelled on leave() - it lives for the entire Call object lifetime. + */ + @InternalStreamVideoApi + val stateScope: CoroutineScope, ) { private val logger by taggedLogger("CallState") @@ -317,14 +323,14 @@ public class CallState( combined.toMap().asIterable().associate { Pair(it.key, it.value.at) } - }.stateIn(scope, SharingStarted.Eagerly, emptyMap()) + }.stateIn(stateScope, SharingStarted.Eagerly, emptyMap()) /** * Pinned participants, combined value both from server and local pins. */ val pinnedParticipants: StateFlow> = _pinnedParticipants - val stats = CallStats(call, scope) + val stats = CallStats(call, stateScope) private val participantsUpdate = TaskSchedulerWithDebounce() private val participantsUpdateConfig = ScheduleConfig( @@ -413,7 +419,9 @@ public class CallState( ), ) val livestream: StateFlow = - livestreamFlow.debounce(1000).stateIn(scope, SharingStarted.WhileSubscribed(10_000L), null) + livestreamFlow.debounce( + 1000, + ).stateIn(stateScope, SharingStarted.WhileSubscribed(10_000L), null) private var _sortedParticipantsState = SortedParticipantsState( scope, @@ -503,11 +511,11 @@ public class CallState( /** how long the call has been running, rounded to seconds, null if the call didn't start yet */ public val duration: StateFlow = _durationInMs.transform { emit(((it ?: 0L) / 1000L).toDuration(DurationUnit.SECONDS)) } - .stateIn(scope, SharingStarted.WhileSubscribed(10000L), null) + .stateIn(stateScope, SharingStarted.WhileSubscribed(10000L), null) /** how many milliseconds the call has been running, null if the call didn't start yet */ public val durationInMs: StateFlow = - _durationInMs.stateIn(scope, SharingStarted.WhileSubscribed(10000L), null) + _durationInMs.stateIn(stateScope, SharingStarted.WhileSubscribed(10000L), null) /** how many milliseconds the call has been running in the simple date format. */ public val durationInDateFormat: StateFlow = durationInMs.mapState { durationInMs -> @@ -570,7 +578,7 @@ public class CallState( emit(duration) } } - }.distinctUntilChanged().stateIn(scope, SharingStarted.WhileSubscribed(10000L), null) + }.distinctUntilChanged().stateIn(stateScope, SharingStarted.WhileSubscribed(10000L), null) /** * How long the call has been live for, represented as [Duration], or null if the call hasn't been live yet. @@ -1469,7 +1477,7 @@ public class CallState( } else { ParticipantState( sessionId = sessionId, - scope = scope, + scope = stateScope, callActions = callActions, initialUserId = userId, source = source, From afa52123739ba45485b2fc29fcca6123863acbc1 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Wed, 18 Feb 2026 10:09:55 +0100 Subject: [PATCH 2/3] fix(core): remove scope cancellation to allow rejoin after leave Remove shutDownJobsGracefully() which cancelled scope and supervisorJob. This prevented rejoining after leave() since the scope was dead. RTC-related jobs are already manually cancelled in leave(): - sfuEvents, monitorPublisherPCStateJob, monitorSubscriberPCStateJob - leaveTimeoutAfterDisconnect, callStatsReportingJob RtcSession.cleanup() cancels its own supervisorJob, handling its coroutines. --- .../io/getstream/video/android/core/Call.kt | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt index 4d06f753b0..ff2b1f62da 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt @@ -87,8 +87,6 @@ import io.getstream.video.android.core.model.VideoTrack import io.getstream.video.android.core.model.toIceServer import io.getstream.video.android.core.notifications.internal.telecom.TelecomCallController import io.getstream.video.android.core.recording.RecordingType -import io.getstream.video.android.core.socket.common.scope.ClientScope -import io.getstream.video.android.core.socket.common.scope.UserScope import io.getstream.video.android.core.utils.AtomicUnitCall import io.getstream.video.android.core.utils.RampValueUpAndDownHelper import io.getstream.video.android.core.utils.StreamSingleFlightProcessorImpl @@ -1519,7 +1517,11 @@ public class Call( fun cleanup() { // monitor.stop() session?.cleanup() - shutDownJobsGracefully() + // NOTE: We intentionally do NOT cancel scope or supervisorJob here. + // This allows the Call to be reused after leave() - the scope stays alive + // so new coroutines can be launched on rejoin. RTC-related jobs are already + // manually cancelled in leave() (sfuEvents, monitorPublisherPCStateJob, etc.) + // and RtcSession.cleanup() cancels its own supervisorJob. callStatsReportingJob?.cancel() mediaManager.cleanup() // TODO Rahul, Verify Later: need to check which call has owned the media at the moment(probably use active call) session = null @@ -1527,15 +1529,6 @@ public class Call( scopeProvider.cleanup() } - // This will allow the Rest APIs to be executed which are in queue before leave - private fun shutDownJobsGracefully() { - UserScope(ClientScope()).launch { - supervisorJob.children.forEach { it.join() } - supervisorJob.cancel() - } - scope.cancel() - } - suspend fun ring(): Result { logger.d { "[ring] #ringing; no args" } return clientImpl.ring(type, id) From daadfca1a2f53381719a346e1434abb040f103b9 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Wed, 18 Feb 2026 10:16:29 +0100 Subject: [PATCH 3/3] refactor(core): use provider pattern for stateScope access Instead of passing stateScope as constructor parameter: - Add stateScope property to CallActions interface - CallState, CallStats access call.stateScope directly - ParticipantState accesses callActions.stateScope This is cleaner than passing scopes through constructors - each class gets the scope from its existing dependencies. --- .../api/stream-video-android-core.api | 17 ++++++++--------- .../io/getstream/video/android/core/Call.kt | 2 +- .../getstream/video/android/core/CallActions.kt | 7 +++++++ .../getstream/video/android/core/CallState.kt | 13 ++++++++----- .../getstream/video/android/core/CallStats.kt | 4 ++-- .../video/android/core/ParticipantState.kt | 5 +++-- .../android/mock/StreamPreviewDataUtils.kt | 2 -- 7 files changed, 29 insertions(+), 21 deletions(-) diff --git a/stream-video-android-core/api/stream-video-android-core.api b/stream-video-android-core/api/stream-video-android-core.api index 2c901a4f7f..9368af7cbf 100644 --- a/stream-video-android-core/api/stream-video-android-core.api +++ b/stream-video-android-core/api/stream-video-android-core.api @@ -8687,7 +8687,7 @@ public final class io/getstream/video/android/core/CallKt { } public final class io/getstream/video/android/core/CallState { - public fun (Lio/getstream/video/android/core/StreamVideo;Lio/getstream/video/android/core/Call;Lio/getstream/video/android/model/User;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineScope;)V + public fun (Lio/getstream/video/android/core/StreamVideo;Lio/getstream/video/android/core/Call;Lio/getstream/video/android/model/User;Lkotlinx/coroutines/CoroutineScope;)V public final fun clearParticipants ()V public final fun getAcceptedBy ()Lkotlinx/coroutines/flow/StateFlow; public final fun getActiveSpeakers ()Lkotlinx/coroutines/flow/StateFlow; @@ -8781,9 +8781,8 @@ public final class io/getstream/video/android/core/CallState { } public final class io/getstream/video/android/core/CallStats { - public fun (Lio/getstream/video/android/core/Call;Lkotlinx/coroutines/CoroutineScope;)V + public fun (Lio/getstream/video/android/core/Call;)V public final fun getCall ()Lio/getstream/video/android/core/Call; - public final fun getCallScope ()Lkotlinx/coroutines/CoroutineScope; public final fun getLocal ()Lkotlinx/coroutines/flow/StateFlow; public final fun getPublisher ()Lio/getstream/video/android/core/PeerConnectionStats; public final fun getSubscriber ()Lio/getstream/video/android/core/PeerConnectionStats; @@ -9119,14 +9118,14 @@ public final class io/getstream/video/android/core/MicrophoneManager { } public final class io/getstream/video/android/core/ParticipantState { - public fun (Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)V - public synthetic fun (Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Ljava/lang/String;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)V + public synthetic fun (Ljava/lang/String;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Ljava/lang/String; - public final fun component5 ()Lstream/video/sfu/models/ParticipantSource; - public final fun component6 ()Ljava/lang/String; + public final fun component4 ()Lstream/video/sfu/models/ParticipantSource; + public final fun component5 ()Ljava/lang/String; public final fun consumeReaction (Lio/getstream/video/android/core/model/Reaction;)V - public final fun copy (Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)Lio/getstream/video/android/core/ParticipantState; - public static synthetic fun copy$default (Lio/getstream/video/android/core/ParticipantState;Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILjava/lang/Object;)Lio/getstream/video/android/core/ParticipantState; + public final fun copy (Ljava/lang/String;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)Lio/getstream/video/android/core/ParticipantState; + public static synthetic fun copy$default (Lio/getstream/video/android/core/ParticipantState;Ljava/lang/String;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILjava/lang/Object;)Lio/getstream/video/android/core/ParticipantState; public fun equals (Ljava/lang/Object;)Z public final fun getAudio ()Lkotlinx/coroutines/flow/StateFlow; public final fun getAudioEnabled ()Lkotlinx/coroutines/flow/StateFlow; diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt index ff2b1f62da..d20670ba22 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt @@ -172,7 +172,7 @@ public class Call( internal val stateScope = CoroutineScope(clientImpl.scope.coroutineContext + SupervisorJob()) /** The call state contains all state such as the participant list, reactions etc */ - val state = CallState(client, this, user, scope, stateScope) + val state = CallState(client, this, user, scope) private val network by lazy { clientImpl.coordinatorConnectionModule.networkStateProvider } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallActions.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallActions.kt index 87e2dd09b2..a53a6b01fb 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallActions.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallActions.kt @@ -19,12 +19,19 @@ package io.getstream.video.android.core import io.getstream.android.video.generated.models.MuteUsersResponse import io.getstream.result.Result import io.getstream.video.android.core.internal.InternalStreamVideoApi +import kotlinx.coroutines.CoroutineScope /** * Interface for call actions that can be performed on a participant. */ @InternalStreamVideoApi interface CallActions { + /** + * A long-lived scope for StateFlows that need to survive leave() and receive coordinator updates. + * This scope is NOT cancelled on leave() - it lives for the entire Call object lifetime. + */ + val stateScope: CoroutineScope + /** * Mute audio for a specific user */ diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt index 0b839647f3..3faf607beb 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt @@ -222,13 +222,14 @@ public class CallState( private val user: User, @InternalStreamVideoApi val scope: CoroutineScope, +) { /** * A long-lived scope for StateFlows that need to survive leave() and receive coordinator updates. * This scope is NOT cancelled on leave() - it lives for the entire Call object lifetime. + * Accessed via call.stateScope. */ - @InternalStreamVideoApi - val stateScope: CoroutineScope, -) { + private val stateScope: CoroutineScope + get() = call.stateScope private val logger by taggedLogger("CallState") private var participantsVisibilityMonitor: Job? = null @@ -236,6 +237,9 @@ public class CallState( // Create a CallActions implementation that delegates to the Call object @InternalStreamVideoApi val callActions = object : CallActions { + override val stateScope: CoroutineScope + get() = call.stateScope + override suspend fun muteUserAudio(userId: String): Result { return call.muteUser(userId, audio = true, video = false, screenShare = false) } @@ -330,7 +334,7 @@ public class CallState( */ val pinnedParticipants: StateFlow> = _pinnedParticipants - val stats = CallStats(call, stateScope) + val stats = CallStats(call) private val participantsUpdate = TaskSchedulerWithDebounce() private val participantsUpdateConfig = ScheduleConfig( @@ -1477,7 +1481,6 @@ public class CallState( } else { ParticipantState( sessionId = sessionId, - scope = stateScope, callActions = callActions, initialUserId = userId, source = source, diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallStats.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallStats.kt index 9e6ab56d45..293425959e 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallStats.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallStats.kt @@ -81,11 +81,11 @@ public data class LocalStats( val deviceModel: String, ) -public class CallStats(val call: Call, val callScope: CoroutineScope) { +public class CallStats(val call: Call) { private val logger by taggedLogger("CallStats") private val supervisorJob = SupervisorJob() - private val scope = CoroutineScope(callScope.coroutineContext + supervisorJob) + private val scope = CoroutineScope(call.stateScope.coroutineContext + supervisorJob) // TODO: cleanup the scope val publisher = PeerConnectionStats(scope) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ParticipantState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ParticipantState.kt index 7ae7d26a53..34141db862 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ParticipantState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ParticipantState.kt @@ -52,8 +52,6 @@ import stream.video.sfu.models.TrackType public data class ParticipantState( /** The SFU returns a session id for each participant. This session id is unique */ var sessionId: String = "", - /** The coroutine scope for this participant */ - private val scope: CoroutineScope, /** The call actions interface for performing operations on this participant */ private val callActions: CallActions, /** The current version of the user, this is the start for participant.user stateflow */ @@ -63,6 +61,9 @@ public data class ParticipantState( @InternalStreamVideoApi var trackLookupPrefix: String = "", ) { + /** The coroutine scope for StateFlows - accessed via callActions.stateScope */ + private val scope: CoroutineScope + get() = callActions.stateScope private val logger by taggedLogger("ParticipantState") diff --git a/stream-video-android-previewdata/src/main/kotlin/io/getstream/video/android/mock/StreamPreviewDataUtils.kt b/stream-video-android-previewdata/src/main/kotlin/io/getstream/video/android/mock/StreamPreviewDataUtils.kt index 61f8956e29..8033817833 100644 --- a/stream-video-android-previewdata/src/main/kotlin/io/getstream/video/android/mock/StreamPreviewDataUtils.kt +++ b/stream-video-android-previewdata/src/main/kotlin/io/getstream/video/android/mock/StreamPreviewDataUtils.kt @@ -59,7 +59,6 @@ public val previewCall: Call = Call( ParticipantState( initialUserId = user.id, sessionId = sessionId, - scope = this.state.scope, callActions = this.state.callActions, ) } @@ -129,7 +128,6 @@ public val previewParticipantsList: List ParticipantState( initialUserId = user.id, sessionId = sessionId, - scope = previewCall.state.scope, callActions = previewCall.state.callActions, ).also { previewCall.state.updateParticipant(it) }, )