Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions stream-video-android-core/api/stream-video-android-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -8781,9 +8781,8 @@ public final class io/getstream/video/android/core/CallState {
}

public final class io/getstream/video/android/core/CallStats {
public fun <init> (Lio/getstream/video/android/core/Call;Lkotlinx/coroutines/CoroutineScope;)V
public fun <init> (Lio/getstream/video/android/core/Call;)V
public final fun getCall ()Lio/getstream/video/android/core/Call;
public final fun getCallScope ()Lkotlinx/coroutines/CoroutineScope;
public final fun getLocal ()Lkotlinx/coroutines/flow/StateFlow;
public final fun getPublisher ()Lio/getstream/video/android/core/PeerConnectionStats;
public final fun getSubscriber ()Lio/getstream/video/android/core/PeerConnectionStats;
Expand Down Expand Up @@ -9119,14 +9118,14 @@ public final class io/getstream/video/android/core/MicrophoneManager {
}

public final class io/getstream/video/android/core/ParticipantState {
public fun <init> (Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)V
public synthetic fun <init> (Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Ljava/lang/String;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)V
public synthetic fun <init> (Ljava/lang/String;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/lang/String;
public final fun component5 ()Lstream/video/sfu/models/ParticipantSource;
public final fun component6 ()Ljava/lang/String;
public final fun component4 ()Lstream/video/sfu/models/ParticipantSource;
public final fun component5 ()Ljava/lang/String;
public final fun consumeReaction (Lio/getstream/video/android/core/model/Reaction;)V
public final fun copy (Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)Lio/getstream/video/android/core/ParticipantState;
public static synthetic fun copy$default (Lio/getstream/video/android/core/ParticipantState;Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILjava/lang/Object;)Lio/getstream/video/android/core/ParticipantState;
public final fun copy (Ljava/lang/String;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)Lio/getstream/video/android/core/ParticipantState;
public static synthetic fun copy$default (Lio/getstream/video/android/core/ParticipantState;Ljava/lang/String;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILjava/lang/Object;)Lio/getstream/video/android/core/ParticipantState;
public fun equals (Ljava/lang/Object;)Z
public final fun getAudio ()Lkotlinx/coroutines/flow/StateFlow;
public final fun getAudioEnabled ()Lkotlinx/coroutines/flow/StateFlow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ import io.getstream.video.android.core.model.VideoTrack
import io.getstream.video.android.core.model.toIceServer
import io.getstream.video.android.core.notifications.internal.telecom.TelecomCallController
import io.getstream.video.android.core.recording.RecordingType
import io.getstream.video.android.core.socket.common.scope.ClientScope
import io.getstream.video.android.core.socket.common.scope.UserScope
import io.getstream.video.android.core.utils.AtomicUnitCall
import io.getstream.video.android.core.utils.RampValueUpAndDownHelper
import io.getstream.video.android.core.utils.StreamSingleFlightProcessorImpl
Expand Down Expand Up @@ -166,6 +164,13 @@ public class Call(

internal val scope = CoroutineScope(clientImpl.scope.coroutineContext + supervisorJob)

/**
* A long-lived scope for StateFlows that need to survive leave() and receive coordinator updates.
* This scope lives for the entire lifetime of the Call object and is NOT cancelled on leave().
* Used for stateIn() operators in CallState, CallStats, and ParticipantState.
*/
internal val stateScope = CoroutineScope(clientImpl.scope.coroutineContext + SupervisorJob())

/** The call state contains all state such as the participant list, reactions etc */
val state = CallState(client, this, user, scope)

Expand Down Expand Up @@ -1512,23 +1517,18 @@ public class Call(
fun cleanup() {
// monitor.stop()
session?.cleanup()
shutDownJobsGracefully()
// NOTE: We intentionally do NOT cancel scope or supervisorJob here.
// This allows the Call to be reused after leave() - the scope stays alive
// so new coroutines can be launched on rejoin. RTC-related jobs are already
// manually cancelled in leave() (sfuEvents, monitorPublisherPCStateJob, etc.)
// and RtcSession.cleanup() cancels its own supervisorJob.
callStatsReportingJob?.cancel()
mediaManager.cleanup() // TODO Rahul, Verify Later: need to check which call has owned the media at the moment(probably use active call)
session = null
// Cleanup the call's scope provider
scopeProvider.cleanup()
}

// This will allow the Rest APIs to be executed which are in queue before leave
private fun shutDownJobsGracefully() {
UserScope(ClientScope()).launch {
supervisorJob.children.forEach { it.join() }
supervisorJob.cancel()
}
scope.cancel()
}

suspend fun ring(): Result<GetCallResponse> {
logger.d { "[ring] #ringing; no args" }
return clientImpl.ring(type, id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ package io.getstream.video.android.core
import io.getstream.android.video.generated.models.MuteUsersResponse
import io.getstream.result.Result
import io.getstream.video.android.core.internal.InternalStreamVideoApi
import kotlinx.coroutines.CoroutineScope

/**
* Interface for call actions that can be performed on a participant.
*/
@InternalStreamVideoApi
interface CallActions {
/**
* A long-lived scope for StateFlows that need to survive leave() and receive coordinator updates.
* This scope is NOT cancelled on leave() - it lives for the entire Call object lifetime.
*/
val stateScope: CoroutineScope

/**
* Mute audio for a specific user
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,23 @@ public class CallState(
@InternalStreamVideoApi
val scope: CoroutineScope,
) {
/**
* A long-lived scope for StateFlows that need to survive leave() and receive coordinator updates.
* This scope is NOT cancelled on leave() - it lives for the entire Call object lifetime.
* Accessed via call.stateScope.
*/
private val stateScope: CoroutineScope
get() = call.stateScope

private val logger by taggedLogger("CallState")
private var participantsVisibilityMonitor: Job? = null

// Create a CallActions implementation that delegates to the Call object
@InternalStreamVideoApi
val callActions = object : CallActions {
override val stateScope: CoroutineScope
get() = call.stateScope

override suspend fun muteUserAudio(userId: String): Result<MuteUsersResponse> {
return call.muteUser(userId, audio = true, video = false, screenShare = false)
}
Expand Down Expand Up @@ -317,14 +327,14 @@ public class CallState(
combined.toMap().asIterable().associate {
Pair(it.key, it.value.at)
}
}.stateIn(scope, SharingStarted.Eagerly, emptyMap())
}.stateIn(stateScope, SharingStarted.Eagerly, emptyMap())

/**
* Pinned participants, combined value both from server and local pins.
*/
val pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> = _pinnedParticipants

val stats = CallStats(call, scope)
val stats = CallStats(call)

private val participantsUpdate = TaskSchedulerWithDebounce()
private val participantsUpdateConfig = ScheduleConfig(
Expand Down Expand Up @@ -413,7 +423,9 @@ public class CallState(
),
)
val livestream: StateFlow<ParticipantState.Video?> =
livestreamFlow.debounce(1000).stateIn(scope, SharingStarted.WhileSubscribed(10_000L), null)
livestreamFlow.debounce(
1000,
).stateIn(stateScope, SharingStarted.WhileSubscribed(10_000L), null)

private var _sortedParticipantsState = SortedParticipantsState(
scope,
Expand Down Expand Up @@ -503,11 +515,11 @@ public class CallState(
/** how long the call has been running, rounded to seconds, null if the call didn't start yet */
public val duration: StateFlow<Duration?> =
_durationInMs.transform { emit(((it ?: 0L) / 1000L).toDuration(DurationUnit.SECONDS)) }
.stateIn(scope, SharingStarted.WhileSubscribed(10000L), null)
.stateIn(stateScope, SharingStarted.WhileSubscribed(10000L), null)

/** how many milliseconds the call has been running, null if the call didn't start yet */
public val durationInMs: StateFlow<Long?> =
_durationInMs.stateIn(scope, SharingStarted.WhileSubscribed(10000L), null)
_durationInMs.stateIn(stateScope, SharingStarted.WhileSubscribed(10000L), null)

/** how many milliseconds the call has been running in the simple date format. */
public val durationInDateFormat: StateFlow<String?> = durationInMs.mapState { durationInMs ->
Expand Down Expand Up @@ -570,7 +582,7 @@ public class CallState(
emit(duration)
}
}
}.distinctUntilChanged().stateIn(scope, SharingStarted.WhileSubscribed(10000L), null)
}.distinctUntilChanged().stateIn(stateScope, SharingStarted.WhileSubscribed(10000L), null)

/**
* How long the call has been live for, represented as [Duration], or null if the call hasn't been live yet.
Expand Down Expand Up @@ -1469,7 +1481,6 @@ public class CallState(
} else {
ParticipantState(
sessionId = sessionId,
scope = scope,
callActions = callActions,
initialUserId = userId,
source = source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ public data class LocalStats(
val deviceModel: String,
)

public class CallStats(val call: Call, val callScope: CoroutineScope) {
public class CallStats(val call: Call) {
private val logger by taggedLogger("CallStats")

private val supervisorJob = SupervisorJob()
private val scope = CoroutineScope(callScope.coroutineContext + supervisorJob)
private val scope = CoroutineScope(call.stateScope.coroutineContext + supervisorJob)
// TODO: cleanup the scope

val publisher = PeerConnectionStats(scope)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ import stream.video.sfu.models.TrackType
public data class ParticipantState(
/** The SFU returns a session id for each participant. This session id is unique */
var sessionId: String = "",
/** The coroutine scope for this participant */
private val scope: CoroutineScope,
/** The call actions interface for performing operations on this participant */
private val callActions: CallActions,
/** The current version of the user, this is the start for participant.user stateflow */
Expand All @@ -63,6 +61,9 @@ public data class ParticipantState(
@InternalStreamVideoApi
var trackLookupPrefix: String = "",
) {
/** The coroutine scope for StateFlows - accessed via callActions.stateScope */
private val scope: CoroutineScope
get() = callActions.stateScope

private val logger by taggedLogger("ParticipantState")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public val previewCall: Call = Call(
ParticipantState(
initialUserId = user.id,
sessionId = sessionId,
scope = this.state.scope,
callActions = this.state.callActions,
)
}
Expand Down Expand Up @@ -129,7 +128,6 @@ public val previewParticipantsList: List<ParticipantState>
ParticipantState(
initialUserId = user.id,
sessionId = sessionId,
scope = previewCall.state.scope,
callActions = previewCall.state.callActions,
).also { previewCall.state.updateParticipant(it) },
)
Expand Down
Loading