From bc6a1eadc80678fe9d91a412e2a8d06658a1a871 Mon Sep 17 00:00:00 2001 From: Tyler Date: Sat, 7 Mar 2026 05:09:48 -0500 Subject: [PATCH 1/2] [feature] Add support for Create events to better distinguish adds/updates --- Valour/Sdk/Nodes/Node.cs | 358 +++++++++--------- Valour/Server/Services/CoreHubService.cs | 3 + Valour/Server/Services/PlanetMemberService.cs | 3 +- 3 files changed, 189 insertions(+), 175 deletions(-) diff --git a/Valour/Sdk/Nodes/Node.cs b/Valour/Sdk/Nodes/Node.cs index 75c5d6b2b..374f431e4 100644 --- a/Valour/Sdk/Nodes/Node.cs +++ b/Valour/Sdk/Nodes/Node.cs @@ -71,12 +71,12 @@ public Node(ValourClient client) _nodeService = client.NodeService; } - public async Task InitializeAsync(string name, bool isPrimary = false) - { - Name = name?.Trim(); - if (string.IsNullOrWhiteSpace(Name)) - return TaskResult.FromFailure("Node name was empty."); - IsPrimary = isPrimary; + public async Task InitializeAsync(string name, bool isPrimary = false) + { + Name = name?.Trim(); + if (string.IsNullOrWhiteSpace(Name)) + return TaskResult.FromFailure("Node name was empty."); + IsPrimary = isPrimary; var logOptions = new LogOptions( "Node " + Name, @@ -101,7 +101,7 @@ public async Task InitializeAsync(string name, bool isPrimary = fals HttpClient.BaseAddress = new Uri(Client.BaseAddress); // Set header for node - HttpClient.DefaultRequestHeaders.Add("X-Server-Select", Name); + HttpClient.DefaultRequestHeaders.Add("X-Server-Select", Name); if (Client.AuthService.Token is null) { @@ -349,68 +349,68 @@ private void OnPingTimer(object state = null) _ = OnPingTimerAsync(); } - private async Task OnPingTimerAsync() - { - try - { - if (HubConnection.State != HubConnectionState.Connected) - { - LogError($"Ping failed. Hub state is: {HubConnection.State.ToString()}"); - if (HubConnection.State == HubConnectionState.Disconnected) - { - _ = Reconnect(); - } - return; - } + private async Task OnPingTimerAsync() + { + try + { + if (HubConnection.State != HubConnectionState.Connected) + { + LogError($"Ping failed. Hub state is: {HubConnection.State.ToString()}"); + if (HubConnection.State == HubConnectionState.Disconnected) + { + _ = Reconnect(); + } + return; + } Log("Doing node ping..."); - _pingStopwatch.Reset(); - _pingStopwatch.Start(); - var response = await HubConnection.InvokeAsync("ping", IsPrimary); - _pingStopwatch.Stop(); - - if (response == "pong") - { - Log($"Pinged successfully in {_pingStopwatch.ElapsedMilliseconds}ms"); - } - else - { - LogError($"Ping failed. Response: {response}"); - await ForceReconnectAfterHeartbeatFailure(); - } - } - catch (Exception ex) - { - LogError("Ping failed.", ex); - if (ShouldForceReconnectForHeartbeatException(ex)) - { - await ForceReconnectAfterHeartbeatFailure(); - } - else - { - LogWarning("Ping invocation failed due to server-side hub error; skipping forced reconnect."); - } - } - } - - private async Task ForceReconnectAfterHeartbeatFailure() - { - if (IsReconnecting || HubConnection is null) - return; - - try - { - if (HubConnection.State != HubConnectionState.Disconnected) - await HubConnection.StopAsync(); - } - catch (Exception stopEx) - { - LogError("Failed to stop SignalR during heartbeat recovery.", stopEx); - } - - _ = Reconnect(); - } + _pingStopwatch.Reset(); + _pingStopwatch.Start(); + var response = await HubConnection.InvokeAsync("ping", IsPrimary); + _pingStopwatch.Stop(); + + if (response == "pong") + { + Log($"Pinged successfully in {_pingStopwatch.ElapsedMilliseconds}ms"); + } + else + { + LogError($"Ping failed. Response: {response}"); + await ForceReconnectAfterHeartbeatFailure(); + } + } + catch (Exception ex) + { + LogError("Ping failed.", ex); + if (ShouldForceReconnectForHeartbeatException(ex)) + { + await ForceReconnectAfterHeartbeatFailure(); + } + else + { + LogWarning("Ping invocation failed due to server-side hub error; skipping forced reconnect."); + } + } + } + + private async Task ForceReconnectAfterHeartbeatFailure() + { + if (IsReconnecting || HubConnection is null) + return; + + try + { + if (HubConnection.State != HubConnectionState.Disconnected) + await HubConnection.StopAsync(); + } + catch (Exception stopEx) + { + LogError("Failed to stop SignalR during heartbeat recovery.", stopEx); + } + + _ = Reconnect(); + } #region SignalR @@ -436,7 +436,7 @@ private async Task ConnectSignalRHub() }) .Build(); - HubConnection.ServerTimeout = TimeSpan.FromSeconds(60); + HubConnection.ServerTimeout = TimeSpan.FromSeconds(60); HubConnection.Closed += OnSignalRClosed; HubConnection.Reconnected += OnHubReconnect; @@ -496,10 +496,20 @@ private void HookModelEvents(TModel model) where TModel : ClientModel { var typeName = model.GetType().Name; + HubConnection.On($"{typeName}-Create", OnModelCreate); HubConnection.On($"{typeName}-Update", OnModelUpdate); HubConnection.On($"{typeName}-Delete", OnModelDelete); } + /// + /// Specific model create event, mocks update events for compatibility + /// + private void OnModelCreate(TModel model) + where TModel : ClientModel + { + model.Sync(Client); + } + /// /// Specific model update event /// @@ -585,100 +595,100 @@ public void HookSignalREvents() /// /// Forces SignalR to refresh the underlying connection /// - public void CheckConnection() - { - if (!IsRealtimeSetup || HubConnection is null) - return; - - _ = CheckConnectionInternal(); - } - - private async Task CheckConnectionInternal() - { - if (IsReconnecting || HubConnection is null) - return; - - LogWarning("Refresh has been requested."); - LogWarning("SignalR state is " + HubConnection.State); - - if (HubConnection.State == HubConnectionState.Disconnected) - { - LogError("Disconnect has been detected. Reconnecting..."); - _ = Reconnect(); - return; - } - - if (HubConnection.State != HubConnectionState.Connected) - { - LogWarning("SignalR not connected; skipping heartbeat check."); - return; - } - - try - { - var pingTask = HubConnection.InvokeAsync("ping"); - var completed = await Task.WhenAny(pingTask, Task.Delay(TimeSpan.FromSeconds(6))); - - if (completed != pingTask) - throw new TimeoutException("SignalR ping timed out."); - - var response = await pingTask; - if (response != "pong") - throw new Exception("Unexpected ping response: " + response); - } - catch (Exception ex) - { - if (!ShouldForceReconnectForHeartbeatException(ex)) - { - LogWarning("SignalR heartbeat ping failed with a non-transport server error; keeping the current connection."); - return; - } - - LogError("SignalR heartbeat failed. Forcing reconnect...", ex); - - try - { - await HubConnection.StopAsync(); - } - catch (Exception stopEx) - { - LogError("Failed to stop SignalR before reconnect.", stopEx); - } - - _ = Reconnect(); - } - } - - private static bool ShouldForceReconnectForHeartbeatException(Exception ex) - { - // Heartbeat checks should only force reconnect for transport-level failures. - // Server-side invocation errors can be transient business-logic issues and - // reconnecting repeatedly causes message loss windows. - if (IsPingInvocationFailure(ex)) - return false; - - if (ex is TimeoutException || ex is TaskCanceledException || ex is OperationCanceledException) - return false; - - return true; - } - - private static bool IsPingInvocationFailure(Exception ex) - { - var current = ex; - while (current is not null) - { - if (!string.IsNullOrWhiteSpace(current.Message) && - current.Message.Contains("Failed to invoke 'Ping'", StringComparison.OrdinalIgnoreCase)) - { - return true; - } - - current = current.InnerException; - } - - return false; - } + public void CheckConnection() + { + if (!IsRealtimeSetup || HubConnection is null) + return; + + _ = CheckConnectionInternal(); + } + + private async Task CheckConnectionInternal() + { + if (IsReconnecting || HubConnection is null) + return; + + LogWarning("Refresh has been requested."); + LogWarning("SignalR state is " + HubConnection.State); + + if (HubConnection.State == HubConnectionState.Disconnected) + { + LogError("Disconnect has been detected. Reconnecting..."); + _ = Reconnect(); + return; + } + + if (HubConnection.State != HubConnectionState.Connected) + { + LogWarning("SignalR not connected; skipping heartbeat check."); + return; + } + + try + { + var pingTask = HubConnection.InvokeAsync("ping"); + var completed = await Task.WhenAny(pingTask, Task.Delay(TimeSpan.FromSeconds(6))); + + if (completed != pingTask) + throw new TimeoutException("SignalR ping timed out."); + + var response = await pingTask; + if (response != "pong") + throw new Exception("Unexpected ping response: " + response); + } + catch (Exception ex) + { + if (!ShouldForceReconnectForHeartbeatException(ex)) + { + LogWarning("SignalR heartbeat ping failed with a non-transport server error; keeping the current connection."); + return; + } + + LogError("SignalR heartbeat failed. Forcing reconnect...", ex); + + try + { + await HubConnection.StopAsync(); + } + catch (Exception stopEx) + { + LogError("Failed to stop SignalR before reconnect.", stopEx); + } + + _ = Reconnect(); + } + } + + private static bool ShouldForceReconnectForHeartbeatException(Exception ex) + { + // Heartbeat checks should only force reconnect for transport-level failures. + // Server-side invocation errors can be transient business-logic issues and + // reconnecting repeatedly causes message loss windows. + if (IsPingInvocationFailure(ex)) + return false; + + if (ex is TimeoutException || ex is TaskCanceledException || ex is OperationCanceledException) + return false; + + return true; + } + + private static bool IsPingInvocationFailure(Exception ex) + { + var current = ex; + while (current is not null) + { + if (!string.IsNullOrWhiteSpace(current.Message) && + current.Message.Contains("Failed to invoke 'Ping'", StringComparison.OrdinalIgnoreCase)) + { + return true; + } + + current = current.InnerException; + } + + return false; + } /// /// Reconnects the SignalR connection @@ -698,18 +708,18 @@ private async Task Reconnect() _ = await HubConnection.InvokeAsync("ping"); } } - catch (System.Exception ex) - { - LogError("Hub reports connection, but ping failed. Will attempt reconnect...", ex); - try - { - await HubConnection.StopAsync(); - } - catch (System.Exception stopEx) - { - LogError("Failed to stop SignalR after ping failure.", stopEx); - } - } + catch (System.Exception ex) + { + LogError("Hub reports connection, but ping failed. Will attempt reconnect...", ex); + try + { + await HubConnection.StopAsync(); + } + catch (System.Exception stopEx) + { + LogError("Failed to stop SignalR after ping failure.", stopEx); + } + } while (HubConnection.State == HubConnectionState.Disconnected) { @@ -1402,4 +1412,4 @@ private async Task> TryDeserializeResponse(HttpResponseMessage } #endregion -} +} diff --git a/Valour/Server/Services/CoreHubService.cs b/Valour/Server/Services/CoreHubService.cs index e659cd192..5bad1a95b 100644 --- a/Valour/Server/Services/CoreHubService.cs +++ b/Valour/Server/Services/CoreHubService.cs @@ -216,6 +216,9 @@ public void NotifyVoiceChannelParticipants(long planetId, VoiceChannelParticipan public void NotifyPlanetItemChange(long planetId, T model, int flags = 0) => _ = _hub.Clients.Group($"p-{planetId}").SendAsync($"{typeof(T).Name}-Update", model, flags); + public async void NotifyPlanetItemCreate(T model, int flags = 0) where T : ISharedPlanetModel => + await _hub.Clients.Group($"p-{model.PlanetId}").SendAsync($"{typeof(T).Name}-Create", model, flags); + public async void NotifyPlanetItemChange(T model, int flags = 0) where T : ISharedPlanetModel => await _hub.Clients.Group($"p-{model.PlanetId}").SendAsync($"{typeof(T).Name}-Update", model, flags); diff --git a/Valour/Server/Services/PlanetMemberService.cs b/Valour/Server/Services/PlanetMemberService.cs index 3e2f72273..9e8b26ebd 100644 --- a/Valour/Server/Services/PlanetMemberService.cs +++ b/Valour/Server/Services/PlanetMemberService.cs @@ -378,7 +378,8 @@ public async Task> AddMemberAsync(long planetId, long u var model = member.ToModel(); - _coreHub.NotifyPlanetItemChange(model); + // Notify subscribers that an item has been created (Member added) + _coreHub.NotifyPlanetItemCreate(model); await _automodService.HandleMemberJoinAsync(model); From e6e44b0665250484b9743f128fb4eb4d768bfcdf Mon Sep 17 00:00:00 2001 From: Tyler Date: Sat, 7 Mar 2026 05:18:39 -0500 Subject: [PATCH 2/2] [feature] Update more models to use Create events --- Valour/Server/Services/AutomodService.cs | 8 ++++---- Valour/Server/Services/ChannelService.cs | 4 ++-- Valour/Server/Services/CoreHubService.cs | 3 +++ Valour/Server/Services/PlanetBanService.cs | 2 +- Valour/Server/Services/PlanetEmojiService.cs | 4 ++-- Valour/Server/Services/PlanetInviteService.cs | 2 +- Valour/Server/Services/PlanetRoleService.cs | 2 +- 7 files changed, 14 insertions(+), 11 deletions(-) diff --git a/Valour/Server/Services/AutomodService.cs b/Valour/Server/Services/AutomodService.cs index e5ec546f6..f5f72365d 100644 --- a/Valour/Server/Services/AutomodService.cs +++ b/Valour/Server/Services/AutomodService.cs @@ -96,7 +96,7 @@ public async Task> CreateTriggerAsync(AutomodTrigger // Invalidate cache _triggerCache.TryRemove(trigger.PlanetId, out _); - _coreHub.NotifyPlanetItemChange(trigger); + _coreHub.NotifyPlanetItemCreate(trigger); return new(true, "Success", trigger); } @@ -131,9 +131,9 @@ public async Task> CreateTriggerWithActionsAsync(Auto _triggerCache.TryRemove(trigger.PlanetId, out _); _actionCache.TryRemove(trigger.Id, out _); - _coreHub.NotifyPlanetItemChange(trigger); + _coreHub.NotifyPlanetItemCreate(trigger); foreach (var action in actions) - _coreHub.NotifyPlanetItemChange(action.PlanetId, action); + _coreHub.NotifyPlanetItemCreate(action.PlanetId, action); return new(true, "Success", trigger); } @@ -220,7 +220,7 @@ public async Task> CreateActionAsync(AutomodAction act // Invalidate cache _actionCache.TryRemove(action.TriggerId, out _); - _coreHub.NotifyPlanetItemChange(action.PlanetId, action); + _coreHub.NotifyPlanetItemCreate(action.PlanetId, action); return new(true, "Success", action); } diff --git a/Valour/Server/Services/ChannelService.cs b/Valour/Server/Services/ChannelService.cs index ac91dc3c8..591aa7a47 100644 --- a/Valour/Server/Services/ChannelService.cs +++ b/Valour/Server/Services/ChannelService.cs @@ -382,10 +382,10 @@ public async Task> CreateAsync(Channel channel, List.FromData(channel); diff --git a/Valour/Server/Services/CoreHubService.cs b/Valour/Server/Services/CoreHubService.cs index 5bad1a95b..944ccbd81 100644 --- a/Valour/Server/Services/CoreHubService.cs +++ b/Valour/Server/Services/CoreHubService.cs @@ -213,6 +213,9 @@ public void NotifyVoiceSessionReplace(long userId, VoiceSessionReplaceEvent upda public void NotifyVoiceChannelParticipants(long planetId, VoiceChannelParticipantsUpdate update) => _ = _hub.Clients.Group($"p-{planetId}").SendAsync("Voice-Channel-Participants", update); + public void NotifyPlanetItemCreate(long planetId, T model, int flags = 0) => + _ = _hub.Clients.Group($"p-{planetId}").SendAsync($"{typeof(T).Name}-Create", model, flags); + public void NotifyPlanetItemChange(long planetId, T model, int flags = 0) => _ = _hub.Clients.Group($"p-{planetId}").SendAsync($"{typeof(T).Name}-Update", model, flags); diff --git a/Valour/Server/Services/PlanetBanService.cs b/Valour/Server/Services/PlanetBanService.cs index fc86a6293..03e722e26 100644 --- a/Valour/Server/Services/PlanetBanService.cs +++ b/Valour/Server/Services/PlanetBanService.cs @@ -97,7 +97,7 @@ public async Task> CreateAsync( await tran.CommitAsync(); // Notify of changes - _coreHub.NotifyPlanetItemChange(ban); + _coreHub.NotifyPlanetItemCreate(ban); _coreHub.NotifyPlanetItemDelete(target); var actorUserId = source == ModerationActionSource.Automod diff --git a/Valour/Server/Services/PlanetEmojiService.cs b/Valour/Server/Services/PlanetEmojiService.cs index 8fa419052..0200f9389 100644 --- a/Valour/Server/Services/PlanetEmojiService.cs +++ b/Valour/Server/Services/PlanetEmojiService.cs @@ -87,7 +87,7 @@ public async Task> CreateAsync( hosted.UpsertEmoji(model); if (notify) - _coreHub.NotifyPlanetItemChange(model); + _coreHub.NotifyPlanetItemCreate(model); return TaskResult.FromData(model); } @@ -122,7 +122,7 @@ public async Task DeleteAsync(long planetId, long emojiId, bool noti public void NotifyCreated(PlanetEmoji emoji) { - _coreHub.NotifyPlanetItemChange(emoji); + _coreHub.NotifyPlanetItemCreate(emoji); } public async Task AreAllIdsValidForPlanetAsync(long planetId, IEnumerable ids) diff --git a/Valour/Server/Services/PlanetInviteService.cs b/Valour/Server/Services/PlanetInviteService.cs index 6793b9bde..863b80dd9 100644 --- a/Valour/Server/Services/PlanetInviteService.cs +++ b/Valour/Server/Services/PlanetInviteService.cs @@ -53,7 +53,7 @@ public async Task> CreateAsync(PlanetInvite invite, Pla return new(false, e.Message); } - _coreHub.NotifyPlanetItemChange(invite); + _coreHub.NotifyPlanetItemCreate(invite); return new(true, "Success", invite); } diff --git a/Valour/Server/Services/PlanetRoleService.cs b/Valour/Server/Services/PlanetRoleService.cs index 0e0435a93..76c5cd287 100644 --- a/Valour/Server/Services/PlanetRoleService.cs +++ b/Valour/Server/Services/PlanetRoleService.cs @@ -108,7 +108,7 @@ public async Task> CreateAsync(PlanetRole role) } hostedPlanet.UpsertRole(role); - _coreHub.NotifyPlanetItemChange(role); + _coreHub.NotifyPlanetItemCreate(role); // If we bumped the default role, update cache and notify clients if (defaultRoleUpdated)