Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions common/src/main/java/com/ridestr/common/nostr/NostrService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ class NostrService internal constructor(
relayManager.ensureConnected()
}

/**
* Force every relay to tear down and reopen, resetting reconnect backoff.
* Use from the manual "Reconnect to Relays" UI — does NOT have the
* dispatcher-starvation / backoff-poisoning problems of disconnectAll+connectAll.
*
* @param newRelayUrls Optional updated relay list (e.g., from
* `settingsRepository.getEffectiveRelays()`) so user edits to custom relays
* take effect without an app restart.
*/
fun forceReconnect(newRelayUrls: List<String>? = null) {
Log.d(TAG, "Force-reconnecting all relays (newRelayUrls=${newRelayUrls?.size ?: "unchanged"})")
relayManager.forceReconnectAll(newRelayUrls)
}

/**
* Clear all subscriptions. Use for debugging or to reset state.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ object RelayConfig {
* Time to wait before attempting reconnection after failure.
*/
const val RECONNECT_DELAY_MS = 5_000L

/**
* WebSocket ping interval. Without pings, NAT/wifi transitions can leave
* the client believing a connection is alive when it isn't.
*/
const val PING_INTERVAL_MS = 30_000L
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ridestr.common.nostr.relay

import android.util.Log
import androidx.annotation.VisibleForTesting
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.crypto.verify
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -69,6 +70,24 @@ class RelayConnection(
private val _state = MutableStateFlow(RelayConnectionState.DISCONNECTED)
val state: StateFlow<RelayConnectionState> = _state.asStateFlow()

/** Test-only accessor for the auto-reconnect backoff counter. */
@VisibleForTesting
internal fun reconnectAttemptsForTest(): Int = reconnectAttempts.get()

/** Test-only mutator to simulate prior auto-retry failures. */
@VisibleForTesting
internal fun setReconnectAttemptsForTest(value: Int) {
reconnectAttempts.set(value)
}

/** Test-only accessor for the generation counter. */
@VisibleForTesting
internal fun connectionGenerationForTest(): Long = synchronized(this) { connectionGeneration }

/** Test-only accessor for the set of subscription IDs this connection has stored. */
@VisibleForTesting
internal fun activeSubscriptionIdsForTest(): Set<String> = activeSubscriptions.keys.toSet()

private val activeSubscriptions = ConcurrentHashMap<String, String>() // subId -> filterJson
private val pendingEvents = ConcurrentHashMap<String, Event>() // eventId -> event

Expand Down Expand Up @@ -115,6 +134,50 @@ class RelayConnection(
Log.d(TAG, "Disconnected from $url")
}

/**
* Tear down the current socket and reopen immediately, resetting the reconnect backoff.
*
* Used by the manual "Reconnect to Relays" UI. Differs from `disconnect()+connect()`:
* 1. `cancel()` (not graceful `close()`) — frees the OkHttp dispatcher slot immediately
* even if the prior socket was mid-handshake. Graceful close can linger up to
* `readTimeout` (30s) while the close handshake races with new connection attempts.
* 2. Resets `reconnectAttempts` so the auto-retry backoff (capped at 60s) restarts at 0,
* preventing manual reconnects from being shadowed by long-pending scheduled retries.
* 3. Single-locked transition keeps `socket`, `state`, and `connectionGeneration`
* coherent so the stale-callback guards in `RelayWebSocketListener` work correctly.
*/
fun forceReconnect() {
val socketToCancel: WebSocket?

synchronized(this) {
socketToCancel = socket
socket = null

// Bump generation so any in-flight messages/callbacks from the prior socket
// are recognized as stale.
connectionGeneration++

// Reset backoff — any scheduled-reconnect coroutine still sleeping will see
// shouldReconnect=true and either complete its connect() (no-op because we're
// now CONNECTING) or be preempted by ours.
reconnectAttempts.set(0)
shouldReconnect.set(true)
_state.value = RelayConnectionState.CONNECTING

val request = Request.Builder()
.url(url)
.build()
socket = client.newWebSocket(request, RelayWebSocketListener())
}

// Cancel outside the lock to avoid holding it during network I/O.
// `cancel()` is non-graceful and releases resources immediately, unlike `close()`
// which initiates a close handshake that can hang for up to readTimeout.
socketToCancel?.cancel()

Log.d(TAG, "Force-reconnecting to $url (generation $connectionGeneration)")
}

/**
* Send a subscription request to the relay.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class RelayManager(
.connectTimeout(RelayConfig.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.readTimeout(RelayConfig.READ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.writeTimeout(RelayConfig.WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
// Send WebSocket pings so NAT/firewall transitions don't leave zombie sockets
// that the client thinks are CONNECTED but won't deliver messages.
// Matches CashuWebSocket's 30s interval.
.pingInterval(RelayConfig.PING_INTERVAL_MS, TimeUnit.MILLISECONDS)
.build()

private val connections = ConcurrentHashMap<String, RelayConnection>()
Expand Down Expand Up @@ -90,6 +94,17 @@ class RelayManager(

connections[url] = connection

// Seed the new connection with every active subscription so when it opens,
// resubscribeAll() actually has something to send. Without this, a relay
// added mid-session (e.g., via forceReconnectAll's syncRelayList) would
// come up CONNECTED but receive zero events — `subscriptions` is the
// authoritative registry, but `subscribe()` only fans out to connections
// that existed at call time. At construction time this loop is a no-op
// because subscriptions is empty.
subscriptions.values.forEach { subscription ->
connection.subscribe(subscription.id, subscription.filters)
}

// Watch connection state changes
scope.launch {
connection.state.collect { state ->
Expand All @@ -98,7 +113,7 @@ class RelayManager(
}

updateConnectionStates()
Log.d(TAG, "Added relay: $url")
Log.d(TAG, "Added relay: $url (seeded with ${subscriptions.size} subscription(s))")
}

/**
Expand Down Expand Up @@ -126,6 +141,46 @@ class RelayManager(
connections.values.forEach { it.disconnect() }
}

/**
* Force-reconnect every relay, resetting backoff and replacing any stale socket.
*
* Use this from the manual "Reconnect to Relays" UI rather than
* `disconnectAll()+connectAll()`. The compound call is not equivalent:
*
* - `disconnectAll()` uses graceful `WebSocket.close()`, which can keep an OkHttp
* dispatcher slot occupied for up to `readTimeout` (30s) while the close handshake
* completes. A rapid `connectAll()` afterwards starts new sockets that compete with
* the dying ones, and the backoff state (`reconnectAttempts`) isn't reset.
* - `forceReconnectAll()` per-relay: `cancel()`s the old socket (immediate teardown),
* resets `reconnectAttempts` to 0, and opens a new socket in a single synchronized
* transition so stale-callback guards in the listener fire reliably.
*
* @param newRelayUrls Optional updated relay list. If provided, relays not in the list
* are removed and new relays are added BEFORE force-reconnecting. Use this when the
* user has edited custom relays in settings so the changes take effect without an
* app restart.
*/
fun forceReconnectAll(newRelayUrls: List<String>? = null) {
if (newRelayUrls != null) {
syncRelayList(newRelayUrls)
}
Log.d(TAG, "Force-reconnecting all ${connections.size} relay(s)")
connections.values.forEach { it.forceReconnect() }
}

/**
* Reconcile the connection set with a target list. Removes connections that aren't
* in the target and adds new ones. Existing connections present in the target stay.
*/
private fun syncRelayList(target: List<String>) {
val targetSet = target.toSet()
// Remove any that are no longer wanted.
val toRemove = connections.keys - targetSet
toRemove.forEach { removeRelay(it) }
// Add any new ones (addRelay is idempotent for existing entries).
target.forEach { url -> addRelay(url) }
}

/**
* Publish an event to all connected relays.
*/
Expand Down Expand Up @@ -348,6 +403,10 @@ class RelayManager(
*/
fun getRelayUrls(): List<String> = connections.keys.toList()

/** Test-only accessor for a specific connection. */
@androidx.annotation.VisibleForTesting
internal fun connectionForTest(url: String): RelayConnection? = connections[url]

private fun handleEvent(event: Event, subscriptionId: String, relayUrl: String) {
Log.d(TAG, "Received event ${event.id} (kind ${event.kind}) from $relayUrl")

Expand Down
51 changes: 41 additions & 10 deletions common/src/main/java/com/ridestr/common/ui/RelayManagementScreen.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import androidx.compose.ui.unit.dp
import androidx.compose.foundation.background
import androidx.compose.foundation.shape.CircleShape
import com.ridestr.common.nostr.relay.RelayConnectionState
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withTimeoutOrNull

/**
* Standalone relay management screen accessible from:
Expand All @@ -46,7 +45,44 @@ fun RelayManagementScreen(
) {
BackHandler(onBack = onBack)

var isReconnecting by remember { mutableStateOf(false) }
// Tracks "we just kicked off a reconnect" so the button doesn't no-op visually.
// Auto-clears either when at least one relay comes back CONNECTED after the
// forced disconnect propagates, or after a hard cap so a totally unreachable
// relay set doesn't lock the button forever.
var reconnectInitiatedAt by remember { mutableStateOf<Long?>(null) }
val isReconnecting = reconnectInitiatedAt != null

// `connectedCount` is a regular composable parameter, recomputed per
// recomposition. Wrap it in `rememberUpdatedState` so `snapshotFlow` inside
// the LaunchedEffect can observe its changes without re-keying the effect
// (which would cancel/restart and lose the "transition seen" state).
val currentConnectedCount by rememberUpdatedState(connectedCount)

LaunchedEffect(reconnectInitiatedAt) {
val started = reconnectInitiatedAt ?: return@LaunchedEffect
val deadline = started + 15_000L
val timeBudget = { (deadline - System.currentTimeMillis()).coerceAtLeast(0) }
try {
// Phase 1: wait for the forced disconnect to be visible
// (connectedCount drops to 0). Without this, pre-existing CONNECTED
// state would let Phase 2 resolve instantly and the spinner would
// never appear. Cross-dispatcher (Dispatchers.IO -> Main) state
// propagation means `connectedCount` can briefly look stale right
// after the press; this wait absorbs that window.
withTimeoutOrNull(timeBudget()) {
snapshotFlow { currentConnectedCount }.first { it == 0 }
}
// Phase 2: wait for at least one relay to come back online, or hit
// the cap. If Phase 1 timed out (e.g., reconnect was a no-op),
// `currentConnectedCount` is already non-zero and this resolves
// immediately.
withTimeoutOrNull(timeBudget()) {
snapshotFlow { currentConnectedCount }.first { it > 0 }
}
} finally {
reconnectInitiatedAt = null
}
}
var newRelayInput by remember { mutableStateOf("") }

Scaffold(
Expand Down Expand Up @@ -122,13 +158,8 @@ fun RelayManagementScreen(
if (onReconnect != null) {
Button(
onClick = {
isReconnecting = true
reconnectInitiatedAt = System.currentTimeMillis()
onReconnect()
// Reset after a brief delay (reconnection is async)
MainScope().launch {
delay(2000)
isReconnecting = false
}
},
enabled = !isReconnecting,
modifier = Modifier
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.ridestr.common.nostr.relay

import okhttp3.OkHttpClient
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
import org.junit.runner.RunWith
import org.robolectric.RobolectricTestRunner
import java.util.concurrent.TimeUnit

/**
* Regression coverage for the manual-reconnect bug
* (variablefate/ridestr#86): "Reconnect to Relays" disconnected from all and never recovered.
*
* Verifies the two invariants that the broken `disconnectAll() + connectAll()` path
* was violating:
* 1. Force-reconnect resets the auto-retry backoff counter so a manual press
* isn't shadowed by long-pending scheduled retries.
* 2. Force-reconnect bumps the connection generation so any in-flight
* callbacks/messages from the prior socket are recognized as stale.
*
* These tests use a real OkHttpClient pointed at an invalid URL so no
* network I/O is needed — we only assert on in-memory state mutations.
*/
@RunWith(RobolectricTestRunner::class)
class RelayConnectionForceReconnectTest {

/** Cheap client; we never actually open a working connection in this test. */
private val client: OkHttpClient = OkHttpClient.Builder()
.connectTimeout(100, TimeUnit.MILLISECONDS)
.readTimeout(100, TimeUnit.MILLISECONDS)
.build()

private fun newConnection(): RelayConnection = RelayConnection(
url = "wss://invalid.localhost.invalid:1/",
client = client,
onEvent = { _, _, _ -> },
onEose = { _, _ -> },
onOk = { _, _, _, _ -> },
onNotice = { _, _ -> }
)

@Test
fun `forceReconnect resets reconnectAttempts to zero`() {
val connection = newConnection()

// Simulate prior auto-retry failures having ramped backoff up.
connection.setReconnectAttemptsForTest(12)
assertEquals(12, connection.reconnectAttemptsForTest())

connection.forceReconnect()

// Backoff must be reset so the user's manual press triggers an immediate retry
// rather than waiting on the 60s scheduled-retry slot.
assertEquals(
"forceReconnect must reset auto-retry backoff so manual reconnects aren't " +
"shadowed by long-pending scheduled retries",
0,
connection.reconnectAttemptsForTest()
)
}

@Test
fun `forceReconnect bumps connection generation`() {
val connection = newConnection()
val genBefore = connection.connectionGenerationForTest()

connection.forceReconnect()

val genAfter = connection.connectionGenerationForTest()
assertTrue(
"Generation must strictly increase so stale callbacks from prior socket " +
"are filtered out (before=$genBefore, after=$genAfter)",
genAfter > genBefore
)
}

@Test
fun `forceReconnect transitions state to CONNECTING`() {
val connection = newConnection()

// Initial state is DISCONNECTED.
assertEquals(RelayConnectionState.DISCONNECTED, connection.state.value)

connection.forceReconnect()

assertEquals(
"forceReconnect must immediately move state to CONNECTING — the prior " +
"disconnectAll+connectAll path could leave it in DISCONNECTED if a " +
"scheduled-retry coroutine raced with the user's press",
RelayConnectionState.CONNECTING,
connection.state.value
)
}

@Test
fun `forceReconnect after disconnect re-enables auto-reconnect`() {
val connection = newConnection()

// disconnect() sets shouldReconnect=false. Anything left over from a logout
// path could otherwise sneak in and suppress future retries.
connection.disconnect()
assertEquals(RelayConnectionState.DISCONNECTED, connection.state.value)

// Bump backoff to verify reset happens even from a fully-torn-down state.
connection.setReconnectAttemptsForTest(7)

connection.forceReconnect()

assertEquals(0, connection.reconnectAttemptsForTest())
assertEquals(RelayConnectionState.CONNECTING, connection.state.value)
}
}
Loading