From d1eb2dd4b029c4cf8a8d23f1fc04442c12f0c8c9 Mon Sep 17 00:00:00 2001 From: Tomas Date: Thu, 18 Jun 2026 22:53:47 +0200 Subject: [PATCH 1/3] Add high-event performance benchmark baseline --- .../Benchmarks/QueuedObservableBenchmarks.cs | 38 ++++++++ .../Benchmarks/ResultDispatchBenchmarks.cs | 57 +++++++++++ .../Benchmarks/StateChangeJsonBenchmarks.cs | 34 +++++++ .../Benchmarks/WebSocketPipelineBenchmarks.cs | 44 +++++++++ .../NetDaemon.PerformanceBenchmarks.csproj | 21 ++++ .../Program.cs | 7 ++ .../Support/BenchmarkWebSocketClient.cs | 77 +++++++++++++++ .../Support/HassPayloadFactory.cs | 64 +++++++++++++ docs/performance/high-event-review.md | 95 +++++++++++++++++++ .../NetDaemon.Client.csproj | 6 ++ .../NetDaemon.HassModel.csproj | 3 + 11 files changed, 446 insertions(+) create mode 100644 benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/QueuedObservableBenchmarks.cs create mode 100644 benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/ResultDispatchBenchmarks.cs create mode 100644 benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/StateChangeJsonBenchmarks.cs create mode 100644 benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/WebSocketPipelineBenchmarks.cs create mode 100644 benchmarks/NetDaemon.PerformanceBenchmarks/NetDaemon.PerformanceBenchmarks.csproj create mode 100644 benchmarks/NetDaemon.PerformanceBenchmarks/Program.cs create mode 100644 benchmarks/NetDaemon.PerformanceBenchmarks/Support/BenchmarkWebSocketClient.cs create mode 100644 benchmarks/NetDaemon.PerformanceBenchmarks/Support/HassPayloadFactory.cs create mode 100644 docs/performance/high-event-review.md diff --git a/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/QueuedObservableBenchmarks.cs b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/QueuedObservableBenchmarks.cs new file mode 100644 index 000000000..6a0f9eef4 --- /dev/null +++ b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/QueuedObservableBenchmarks.cs @@ -0,0 +1,38 @@ +using System.Reactive.Subjects; +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.Logging.Abstractions; +using NetDaemon.HassModel.Internal; + +namespace NetDaemon.PerformanceBenchmarks.Benchmarks; + +[MemoryDiagnoser] +public class QueuedObservableBenchmarks +{ + private const int EventCount = 10_000; + + [Params(1, 10, 50)] + public int AppScopes { get; set; } + + [Benchmark] + public async Task FanOutToAppScopedQueues() + { + using var source = new Subject(); + var queues = new QueuedObservable[AppScopes]; + + for (var i = 0; i < queues.Length; i++) + { + queues[i] = new QueuedObservable(source, NullLogger.Instance); + queues[i].Subscribe(static _ => { }); + } + + for (var i = 0; i < EventCount; i++) + { + source.OnNext(i); + } + + foreach (var queue in queues) + { + await queue.DisposeAsync().ConfigureAwait(false); + } + } +} diff --git a/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/ResultDispatchBenchmarks.cs b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/ResultDispatchBenchmarks.cs new file mode 100644 index 000000000..629bf57e5 --- /dev/null +++ b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/ResultDispatchBenchmarks.cs @@ -0,0 +1,57 @@ +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Reactive.Threading.Tasks; +using System.Text.Json; +using BenchmarkDotNet.Attributes; +using NetDaemon.Client.Internal.HomeAssistant.Commands; +using NetDaemon.PerformanceBenchmarks.Support; + +namespace NetDaemon.PerformanceBenchmarks.Benchmarks; + +[MemoryDiagnoser] +public class ResultDispatchBenchmarks +{ + private HassMessage[] _results = []; + + [Params(1, 10, 100, 1000)] + public int PendingCommands { get; set; } + + [GlobalSetup] + public void Setup() + { + _results = Enumerable.Range(1, PendingCommands) + .Select(id => JsonSerializer.Deserialize(HassPayloadFactory.ResultMessage(id))!) + .ToArray(); + } + + [Benchmark(Baseline = true)] + public async Task PerCommandRxFilterSubscriptions() + { + using var subject = new Subject(); + var tasks = Enumerable.Range(1, PendingCommands) + .Select(id => subject.Where(n => n.Type == "result" && n.Id == id).FirstAsync().ToTask()) + .ToArray(); + + foreach (var result in _results) + { + subject.OnNext(result); + } + + await Task.WhenAll(tasks).ConfigureAwait(false); + } + + [Benchmark] + public void DictionaryResultDispatch() + { + var pending = Enumerable.Range(1, PendingCommands) + .ToDictionary(static id => id, static _ => new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); + + foreach (var result in _results) + { + if (pending.Remove(result.Id, out var completionSource)) + { + completionSource.SetResult(result); + } + } + } +} diff --git a/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/StateChangeJsonBenchmarks.cs b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/StateChangeJsonBenchmarks.cs new file mode 100644 index 000000000..91e8cd78a --- /dev/null +++ b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/StateChangeJsonBenchmarks.cs @@ -0,0 +1,34 @@ +using System.Text.Json; +using BenchmarkDotNet.Attributes; +using NetDaemon.Client.HomeAssistant.Model; +using NetDaemon.Client.Internal.HomeAssistant.Commands; +using NetDaemon.PerformanceBenchmarks.Support; + +namespace NetDaemon.PerformanceBenchmarks.Benchmarks; + +[MemoryDiagnoser] +public class StateChangeJsonBenchmarks +{ + private readonly JsonElement _eventData; + + public StateChangeJsonBenchmarks() + { + var message = JsonSerializer.Deserialize(HassPayloadFactory.StateChangedEvent(1)) + ?? throw new InvalidOperationException("Failed to create benchmark event"); + _eventData = message.Event!.DataElement!.Value; + } + + [Benchmark(Baseline = true)] + public string? ExtractEntityIdAndLazyNewState() + { + var entityId = _eventData.GetProperty("entity_id").GetString(); + _ = _eventData.GetProperty("new_state"); + return entityId; + } + + [Benchmark] + public HassState? ForceDeserializeNewState() + { + return _eventData.GetProperty("new_state").Deserialize(); + } +} diff --git a/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/WebSocketPipelineBenchmarks.cs b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/WebSocketPipelineBenchmarks.cs new file mode 100644 index 000000000..0643c61c4 --- /dev/null +++ b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/WebSocketPipelineBenchmarks.cs @@ -0,0 +1,44 @@ +using BenchmarkDotNet.Attributes; +using NetDaemon.Client.Internal.HomeAssistant.Commands; +using NetDaemon.Client.Internal.Net; +using NetDaemon.PerformanceBenchmarks.Support; + +namespace NetDaemon.PerformanceBenchmarks.Benchmarks; + +[MemoryDiagnoser] +public class WebSocketPipelineBenchmarks +{ + private const int CoalescedEventCount = 64; + private readonly string _singleEvent = HassPayloadFactory.StateChangedEvent(1); + private readonly string _coalescedEvents = HassPayloadFactory.CoalescedStateChangedEvents(CoalescedEventCount); + private readonly object _serviceCommand = new { id = 42, type = "call_service", domain = "light", service = "turn_on" }; + + [Benchmark(Baseline = true)] + public async Task ReadSingleEvent() + { + var websocket = new BenchmarkWebSocketClient(); + websocket.EnqueueJson(_singleEvent); + var pipeline = new WebSocketClientTransportPipeline(websocket); + + return await pipeline.GetNextMessagesAsync(CancellationToken.None).ConfigureAwait(false); + } + + [Benchmark] + public async Task ReadCoalescedEvents() + { + var websocket = new BenchmarkWebSocketClient(); + websocket.EnqueueJson(_coalescedEvents); + var pipeline = new WebSocketClientTransportPipeline(websocket); + + return await pipeline.GetNextMessagesAsync(CancellationToken.None).ConfigureAwait(false); + } + + [Benchmark] + public async Task SendServiceCommand() + { + var websocket = new BenchmarkWebSocketClient(); + var pipeline = new WebSocketClientTransportPipeline(websocket); + + await pipeline.SendMessageAsync(_serviceCommand, CancellationToken.None).ConfigureAwait(false); + } +} diff --git a/benchmarks/NetDaemon.PerformanceBenchmarks/NetDaemon.PerformanceBenchmarks.csproj b/benchmarks/NetDaemon.PerformanceBenchmarks/NetDaemon.PerformanceBenchmarks.csproj new file mode 100644 index 000000000..cdec5e74d --- /dev/null +++ b/benchmarks/NetDaemon.PerformanceBenchmarks/NetDaemon.PerformanceBenchmarks.csproj @@ -0,0 +1,21 @@ + + + + Exe + false + enable + enable + + + + + + + + + + + + + + diff --git a/benchmarks/NetDaemon.PerformanceBenchmarks/Program.cs b/benchmarks/NetDaemon.PerformanceBenchmarks/Program.cs new file mode 100644 index 000000000..212887bdc --- /dev/null +++ b/benchmarks/NetDaemon.PerformanceBenchmarks/Program.cs @@ -0,0 +1,7 @@ +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Running; + +var artifactsPath = Path.Combine(AppContext.BaseDirectory, "BenchmarkDotNet.Artifacts"); +var config = DefaultConfig.Instance.WithArtifactsPath(artifactsPath); + +BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args, config); diff --git a/benchmarks/NetDaemon.PerformanceBenchmarks/Support/BenchmarkWebSocketClient.cs b/benchmarks/NetDaemon.PerformanceBenchmarks/Support/BenchmarkWebSocketClient.cs new file mode 100644 index 000000000..0eeabf51f --- /dev/null +++ b/benchmarks/NetDaemon.PerformanceBenchmarks/Support/BenchmarkWebSocketClient.cs @@ -0,0 +1,77 @@ +using System.Net.WebSockets; +using System.Text; +using NetDaemon.Client.Internal.Net; + +namespace NetDaemon.PerformanceBenchmarks.Support; + +internal sealed class BenchmarkWebSocketClient : IWebSocketClient +{ + private readonly Queue _responses = new(); + private byte[]? _currentResponse; + private int _currentResponseOffset; + + public WebSocketState State { get; private set; } = WebSocketState.Open; + + public WebSocketCloseStatus? CloseStatus { get; private set; } + + public ValueTask DisposeAsync() + { + State = WebSocketState.Closed; + return ValueTask.CompletedTask; + } + + public Task ConnectAsync(Uri uri, CancellationToken cancellationToken) => Task.CompletedTask; + + public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) + { + CloseStatus = closeStatus; + State = WebSocketState.Closed; + return Task.CompletedTask; + } + + public Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) + { + CloseStatus = closeStatus; + State = WebSocketState.Closed; + return Task.CompletedTask; + } + + public Task SendAsync(ArraySegment buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) + { + SentBytes += buffer.Count; + SentMessages++; + return Task.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) + { + SentBytes += buffer.Length; + SentMessages++; + return ValueTask.CompletedTask; + } + + public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken) + { + _currentResponse ??= _responses.Dequeue(); + + var remaining = _currentResponse.Length - _currentResponseOffset; + var count = Math.Min(remaining, buffer.Length); + _currentResponse.AsMemory(_currentResponseOffset, count).CopyTo(buffer); + _currentResponseOffset += count; + + var endOfMessage = _currentResponseOffset == _currentResponse.Length; + if (endOfMessage) + { + _currentResponse = null; + _currentResponseOffset = 0; + } + + return ValueTask.FromResult(new ValueWebSocketReceiveResult(count, WebSocketMessageType.Text, endOfMessage)); + } + + public int SentMessages { get; private set; } + + public int SentBytes { get; private set; } + + public void EnqueueJson(string json) => _responses.Enqueue(Encoding.UTF8.GetBytes(json)); +} diff --git a/benchmarks/NetDaemon.PerformanceBenchmarks/Support/HassPayloadFactory.cs b/benchmarks/NetDaemon.PerformanceBenchmarks/Support/HassPayloadFactory.cs new file mode 100644 index 000000000..b61a93a8f --- /dev/null +++ b/benchmarks/NetDaemon.PerformanceBenchmarks/Support/HassPayloadFactory.cs @@ -0,0 +1,64 @@ +using System.Text; + +namespace NetDaemon.PerformanceBenchmarks.Support; + +internal static class HassPayloadFactory +{ + public static string StateChangedEvent(int index) + { + return $$""" + { + "id": 2, + "type": "event", + "event": { + "event_type": "state_changed", + "data": { + "entity_id": "sensor.perf_{{index}}", + "old_state": { + "entity_id": "sensor.perf_{{index}}", + "state": "{{index - 1}}", + "attributes": { "unit_of_measurement": "W", "friendly_name": "Perf {{index}}" }, + "last_changed": "2026-06-18T10:00:00Z", + "last_updated": "2026-06-18T10:00:00Z" + }, + "new_state": { + "entity_id": "sensor.perf_{{index}}", + "state": "{{index}}", + "attributes": { "unit_of_measurement": "W", "friendly_name": "Perf {{index}}" }, + "last_changed": "2026-06-18T10:00:01Z", + "last_updated": "2026-06-18T10:00:01Z", + "context": { "id": "ctx-{{index}}", "parent_id": null, "user_id": null } + } + }, + "origin": "LOCAL", + "time_fired": "2026-06-18T10:00:01Z" + } + } + """; + } + + public static string CoalescedStateChangedEvents(int count) + { + var builder = new StringBuilder(count * 700); + builder.Append('['); + for (var i = 0; i < count; i++) + { + if (i > 0) builder.Append(','); + builder.Append(StateChangedEvent(i)); + } + builder.Append(']'); + return builder.ToString(); + } + + public static string ResultMessage(int id) + { + return $$""" + { + "id": {{id}}, + "type": "result", + "success": true, + "result": { "ok": true } + } + """; + } +} diff --git a/docs/performance/high-event-review.md b/docs/performance/high-event-review.md new file mode 100644 index 000000000..3f36e66b3 --- /dev/null +++ b/docs/performance/high-event-review.md @@ -0,0 +1,95 @@ +# High-Event Home Assistant Performance Review + +This review is scoped to realistic high-event Home Assistant workloads: websocket event ingest, command/result latency, state cache updates, Rx fan-out, and service-call behavior. It intentionally ignores changes that only improve cold paths or trim tiny CPU costs that disappear behind network latency. + +## How To Reproduce The Measurements + +Use the isolated benchmark project: + +```bash +dotnet run -c Release --project benchmarks/NetDaemon.PerformanceBenchmarks -- --filter '*' +``` + +Useful focused runs: + +```bash +dotnet run -c Release --project benchmarks/NetDaemon.PerformanceBenchmarks -- --filter '*WebSocketPipeline*' +dotnet run -c Release --project benchmarks/NetDaemon.PerformanceBenchmarks -- --filter '*ResultDispatch*' +dotnet run -c Release --project benchmarks/NetDaemon.PerformanceBenchmarks -- --filter '*QueuedObservable*' +dotnet run -c Release --project benchmarks/NetDaemon.PerformanceBenchmarks -- --filter '*StateChangeJson*' +``` + +For quick validation without full benchmark confidence: + +```bash +dotnet run -c Release --project benchmarks/NetDaemon.PerformanceBenchmarks -- --filter '*' --job Dry +``` + +The dry run was used to verify that all 16 benchmark cases execute end-to-end. Do not use dry-run timings as decision-grade performance numbers; run the default BenchmarkDotNet job for comparisons that will drive code changes. + +Benchmark reports are written under the benchmark project's build output directory so normal runs do not create root-level repo artifacts. + +## Ranked Findings + +1. **Result dispatch scales with every pending command** + + Evidence: `HomeAssistantConnection.SendCommandAsyncInternal` creates a filtered Rx subscription for each command waiting for a `result` message. Every incoming result flows through the shared `Subject` and each active predicate checks `Type` and `Id`. + + Why it matters: service calls are network-bound individually, but Home Assistant automations can issue bursts. Under high in-flight command counts, result correlation should be near O(1) per result; the current shape tends toward O(pending commands). + + Recommendation: replace the per-command filtered subscription path with an internal `ConcurrentDictionary>` keyed by command id. In the receive loop, route `type == "result"` messages directly to the matching completion source and publish non-result/event messages through the existing observable path. Keep public APIs unchanged. + + Risk: medium. This touches command completion, timeout, disposal, and error logging semantics. Add tests for success, HA error result, timeout, cancellation, disposal with pending commands, and out-of-order results. + +2. **Websocket receive currently double-deserializes message payloads** + + Evidence: `WebSocketTransportPipeline.ReadMessagesFromPipelineAndSerializeAsync` deserializes the pipe to `JsonElement?`, then deserializes that element again to either `T` or `T[]`. + + Why it matters: this is in the core event ingest path. Coalesced HA event arrays multiply the cost because the whole payload becomes a DOM before the typed messages are created. + + Recommendation: benchmark and prototype a direct parse path that uses `JsonDocument.ParseAsync` only to detect object versus array, then deserializes from the original buffered bytes, or accumulates the websocket message into pooled memory and calls `JsonSerializer.Deserialize`/`Deserialize` once. Keep chunked message support and close/cancellation behavior intact. + + Risk: medium. The pipe-based implementation is currently simple and handles chunking well; a replacement must prove lower allocations without regressing large messages or close-frame handling. + +3. **Per-app event queues multiply work for all-event subscriptions** + + Evidence: each `AppScopedHaContextProvider` creates a `QueuedObservable` over `EntityStateCache.AllEvents`. Each scope subscribes to every HA event before app-level filters such as `StateAllChanges` and `Events.Filter` run. + + Why it matters: the design protects apps from each other's slow observers, which is valuable, but high app counts turn one HA event into N channel writes and N queue drains even when most apps only care about a narrow event type. + + Recommendation: keep per-app isolation, but evaluate moving common filters before queueing for common APIs. Candidate: provide internal typed/event-filtered queued streams for `state_changed` and trigger messages so apps that only use state changes do not queue unrelated HA events. + + Risk: medium-high. Event ordering and isolation are user-visible automation semantics. Require latency/drop benchmarks with 1, 10, and 50 app scopes plus one slow observer before changing behavior. + +4. **Bounded queue drops are only logged, not observable** + + Evidence: `QueuedObservable` uses a bounded channel with capacity 1024, logs near 90%, and drops events when `TryWrite` fails. + + Why it matters: dropping is a reasonable protection against slow automations, but in real HA event storms this can silently degrade automation correctness beyond logs. + + Recommendation: add internal counters or structured diagnostics for near-full and dropped events. Consider configurable capacity only after benchmarked evidence that realistic workloads exceed 1024 with healthy handlers. + + Risk: low for diagnostics, medium for policy changes. Avoid changing drop/backpressure behavior without compatibility discussion. + +5. **Registry update handling does full reloads** + + Evidence: `RegistryCache` subscribes to registry update events and reloads whole entity/device/area/label/floor registries after throttling. + + Why it matters: this is secondary for high-event steady state, but large HA installations can make reload bursts visible, and reloads use network calls that can contend with command traffic. + + Recommendation: leave this behind the core event path. If benchmarks or user traces show registry churn, inspect HA registry update payloads and switch only safe cases to incremental updates. + + Risk: medium. Incremental cache invalidation can be subtle; full reload is robust. + +## Benchmark Coverage Added + +- `WebSocketPipelineBenchmarks`: single event, coalesced events, and send serialization. +- `ResultDispatchBenchmarks`: current per-command Rx filtered subscriptions versus ID-indexed dictionary dispatch. +- `QueuedObservableBenchmarks`: fan-out cost across 1, 10, and 50 app-scoped queues. +- `StateChangeJsonBenchmarks`: lazy state-change cache path versus forced `HassState` deserialization. + +## Acceptance Criteria For Future Fixes + +- Maintain public API compatibility unless a diagnostic-only option is explicitly accepted. +- Improve hot-path p95 latency, allocation rate, queue drops, or throughput by a meaningful margin in the benchmark harness. +- Preserve websocket chunking/coalescing support, event ordering within an app scope, slow-subscriber isolation, and command timeout/error semantics. diff --git a/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj b/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj index ee209ab93..9b025620a 100644 --- a/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj +++ b/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj @@ -37,6 +37,12 @@ 1701;1702;IL2121;CS1591 + + + <_Parameter1>NetDaemon.PerformanceBenchmarks + + + diff --git a/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj b/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj index 9ed4b747d..6d29fa52a 100644 --- a/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj +++ b/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj @@ -44,6 +44,9 @@ <_Parameter1>DynamicProxyGenAssembly2 + + <_Parameter1>NetDaemon.PerformanceBenchmarks + From a6ceed159223d04e04e797951540de66ac6510c8 Mon Sep 17 00:00:00 2001 From: Tomas Date: Thu, 18 Jun 2026 23:25:56 +0200 Subject: [PATCH 2/3] Improve high-event hot paths --- .../Benchmarks/StateChangeJsonBenchmarks.cs | 21 ++- docs/performance/high-event-review.md | 62 ++++++++ .../HomeAssistantConnectionTests.cs | 114 ++++++++++++++- .../Internal/HomeAssistantConnection.cs | 56 +++++-- .../Net/WebSocketTransportPipeline.cs | 137 +++++++----------- 5 files changed, 290 insertions(+), 100 deletions(-) diff --git a/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/StateChangeJsonBenchmarks.cs b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/StateChangeJsonBenchmarks.cs index 91e8cd78a..c95ac826b 100644 --- a/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/StateChangeJsonBenchmarks.cs +++ b/benchmarks/NetDaemon.PerformanceBenchmarks/Benchmarks/StateChangeJsonBenchmarks.cs @@ -1,21 +1,20 @@ using System.Text.Json; using BenchmarkDotNet.Attributes; using NetDaemon.Client.HomeAssistant.Model; -using NetDaemon.Client.Internal.HomeAssistant.Commands; using NetDaemon.PerformanceBenchmarks.Support; namespace NetDaemon.PerformanceBenchmarks.Benchmarks; [MemoryDiagnoser] -public class StateChangeJsonBenchmarks +public class StateChangeJsonBenchmarks : IDisposable { + private readonly JsonDocument _document; private readonly JsonElement _eventData; public StateChangeJsonBenchmarks() { - var message = JsonSerializer.Deserialize(HassPayloadFactory.StateChangedEvent(1)) - ?? throw new InvalidOperationException("Failed to create benchmark event"); - _eventData = message.Event!.DataElement!.Value; + _document = JsonDocument.Parse(HassPayloadFactory.StateChangedEvent(1)); + _eventData = _document.RootElement.GetProperty("event").GetProperty("data"); } [Benchmark(Baseline = true)] @@ -31,4 +30,16 @@ public StateChangeJsonBenchmarks() { return _eventData.GetProperty("new_state").Deserialize(); } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + _document.Dispose(); + } } diff --git a/docs/performance/high-event-review.md b/docs/performance/high-event-review.md index 3f36e66b3..72a7afd1b 100644 --- a/docs/performance/high-event-review.md +++ b/docs/performance/high-event-review.md @@ -88,6 +88,68 @@ Benchmark reports are written under the benchmark project's build output directo - `QueuedObservableBenchmarks`: fan-out cost across 1, 10, and 50 app-scoped queues. - `StateChangeJsonBenchmarks`: lazy state-change cache path versus forced `HassState` deserialization. +## Implemented Hot-Path Improvements + +Baseline commit: `d1eb2dd4 Add high-event performance benchmark baseline`. + +Post-change benchmark command: + +```bash +dotnet run -c Release --project benchmarks/NetDaemon.PerformanceBenchmarks -- --filter '*' +``` + +Benchmark reports: + +- `benchmarks/NetDaemon.PerformanceBenchmarks/bin/Release/net10.0/BenchmarkDotNet.Artifacts/results/NetDaemon.PerformanceBenchmarks.Benchmarks.ResultDispatchBenchmarks-report-github.md` +- `benchmarks/NetDaemon.PerformanceBenchmarks/bin/Release/net10.0/BenchmarkDotNet.Artifacts/results/NetDaemon.PerformanceBenchmarks.Benchmarks.WebSocketPipelineBenchmarks-report-github.md` +- `benchmarks/NetDaemon.PerformanceBenchmarks/bin/Release/net10.0/BenchmarkDotNet.Artifacts/results/NetDaemon.PerformanceBenchmarks.Benchmarks.StateChangeJsonBenchmarks-report-github.md` +- `benchmarks/NetDaemon.PerformanceBenchmarks/bin/Release/net10.0/BenchmarkDotNet.Artifacts/results/NetDaemon.PerformanceBenchmarks.Benchmarks.QueuedObservableBenchmarks-report-github.md` + +### Result Dispatch + +`HomeAssistantConnection` now tracks pending result waiters in an ID-indexed `ConcurrentDictionary>`. The receive loop completes matching `result` messages directly and still publishes every raw `HassMessage` to `OnHassMessage`. + +| Pending commands | Baseline Rx dispatch | Implemented dictionary path | Mean improvement | Allocation improvement | +|-----------------:|---------------------:|----------------------------:|-----------------:|-----------------------:| +| 1 | 215.94 ns, 880 B | 44.92 ns, 352 B | 79% faster | 60% lower | +| 10 | 2,081.51 ns, 6,424 B | 326.47 ns, 1,360 B | 84% faster | 79% lower | +| 100 | 29,398.81 ns, 133,144 B | 3,282.14 ns, 12,688 B | 89% faster | 90% lower | +| 1000 | 1,767,235.91 ns, 8,528,344 B | 27,993.05 ns, 126,976 B | 98% faster | 99% lower | + +Behavior covered by tests: + +- concurrent commands complete from matching result IDs when results arrive out of order +- raw `result` messages are still published to `IHomeAssistantHassMessages.OnHassMessage` +- send failures remove pending result state and do not break later commands +- disposing the connection cancels pending result waiters + +### Websocket Receive + +`WebSocketClientTransportPipeline` now reads a complete websocket message into a byte buffer, checks whether the payload is a JSON object or array, and deserializes directly to `T` or `T[]`. This removes the previous `JsonElement` intermediate followed by a second typed deserialization. + +| Scenario | Baseline | Post-change | Result | +|---------|---------:|------------:|-------:| +| Single event read | 6,638.9 ns, 7.2 KB | 2,054.0 ns, 5,600 B | 69% faster, lower allocation | +| 64 coalesced events | 306,653.5 ns, 259.98 KB | 111,851.9 ns, 291,785 B | 64% faster, about 10% more allocation | +| Send service command | 548.0 ns, 1.07 KB | 605.0 ns, 646 B | effectively unchanged for this receive-side change | + +The coalesced-event receive path is the meaningful win here because it is the high-event ingest case. The allocation increase should be watched if this path is tuned further; the latency reduction is large enough to keep this implementation. + +### State-Change Benchmark Harness + +The initial baseline run failed `StateChangeJsonBenchmarks.ExtractEntityIdAndLazyNewState` because the benchmark stored a `JsonElement` after its owning document had gone out of scope. The benchmark now keeps the owning `JsonDocument` alive for the benchmark lifetime. + +Post-fix measurements: + +| Scenario | Mean | Allocated | +|---------|-----:|----------:| +| Extract entity id and lazy `new_state` element | 31.28 ns | 48 B | +| Force deserialize `new_state` | 505.98 ns | 952 B | + +The fixed benchmark confirms the current lazy state-change path avoids most of the deserialization cost until an observer asks for typed state. + +These measurements are local-machine comparisons on this workstation. Treat them as change-direction evidence for this codebase, not as cross-machine absolute performance truth. + ## Acceptance Criteria For Future Fixes - Maintain public API compatibility unless a diagnostic-only option is explicitly accepted. diff --git a/src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/HomeAssistantConnectionTests.cs b/src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/HomeAssistantConnectionTests.cs index 4e5f1b15a..81a69faa4 100644 --- a/src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/HomeAssistantConnectionTests.cs +++ b/src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/HomeAssistantConnectionTests.cs @@ -6,7 +6,11 @@ public class HomeAssistantConnectionTests { private static IHomeAssistantConnection GetDefaultHomeAssistantConnection() { - var pipeline = new TransportPipelineMock(); + return CreateHomeAssistantConnection(new TransportPipelineMock()); + } + + private static HomeAssistantConnection CreateHomeAssistantConnection(TransportPipelineMock pipeline) + { pipeline.Setup(n => n.WebSocketState).Returns(WebSocketState.Open); var apiManagerMock = new Mock(); var loggerMock = new Mock>(); @@ -62,4 +66,112 @@ public async Task HomeAssistantConnectionDisposedMultipleTimesShouldNotThrow() await homeAssistantConnection!.DisposeAsync(); await homeAssistantConnection!.DisposeAsync(); } + + [Fact] + public async Task ConcurrentCommandsShouldCompleteFromMatchingResultIds() + { + var pipeline = new TransportPipelineMock(); + var sentCommandIds = Channel.CreateUnbounded(); + pipeline + .Setup(n => n.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns((command, _) => + { + sentCommandIds.Writer.TryWrite(command.Id); + return Task.CompletedTask; + }); + + await using var homeAssistantConnection = CreateHomeAssistantConnection(pipeline); + var commands = Enumerable.Range(0, 3).Select(_ => new FakeCommand()).ToArray(); + var resultTasks = commands + .Select(command => homeAssistantConnection.SendCommandAndReturnHassMessageResponseAsync(command, CancellationToken.None)) + .ToArray(); + + for (var i = 0; i < commands.Length; i++) + { + _ = await sentCommandIds.Reader.ReadAsync(CancellationToken.None); + } + + foreach (var command in commands.Reverse()) + { + pipeline.AddResponse(new HassMessage { Type = "result", Id = command.Id, Success = true }); + } + + var results = await Task.WhenAll(resultTasks).WaitAsync(TimeSpan.FromSeconds(5)); + + results.Select(result => result!.Id).Should().Equal(commands.Select(command => command.Id)); + } + + [Fact] + public async Task ResultMessagesShouldStillBePublishedToRawHassMessageSubscribers() + { + var pipeline = new TransportPipelineMock(); + pipeline + .Setup(n => n.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + await using var homeAssistantConnection = CreateHomeAssistantConnection(pipeline); + var rawMessages = (homeAssistantConnection as IHomeAssistantHassMessages)!; + var rawResultTask = rawMessages.OnHassMessage + .Where(message => message.Type == "result") + .FirstAsync() + .ToTask(); + + var command = new FakeCommand(); + var resultTask = homeAssistantConnection.SendCommandAndReturnHassMessageResponseAsync(command, CancellationToken.None); + + pipeline.AddResponse(new HassMessage { Type = "result", Id = command.Id, Success = true }); + + var result = await resultTask.WaitAsync(TimeSpan.FromSeconds(5)); + var rawResult = await rawResultTask.WaitAsync(TimeSpan.FromSeconds(5)); + + result!.Id.Should().Be(command.Id); + rawResult.Id.Should().Be(command.Id); + } + + [Fact] + public async Task SendFailureShouldFaultResultWaiterWithoutBreakingLaterCommands() + { + var pipeline = new TransportPipelineMock(); + var firstSend = true; + pipeline + .Setup(n => n.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns((_, _) => + { + if (!firstSend) + return Task.CompletedTask; + + firstSend = false; + throw new InvalidOperationException("send failed"); + }); + + await using var homeAssistantConnection = CreateHomeAssistantConnection(pipeline); + + await Assert.ThrowsAsync( + () => homeAssistantConnection.SendCommandAndReturnHassMessageResponseAsync(new FakeCommand(), CancellationToken.None)); + + var command = new FakeCommand(); + var resultTask = homeAssistantConnection.SendCommandAndReturnHassMessageResponseAsync(command, CancellationToken.None); + pipeline.AddResponse(new HassMessage { Type = "result", Id = command.Id, Success = true }); + + var result = await resultTask.WaitAsync(TimeSpan.FromSeconds(5)); + + result!.Id.Should().Be(command.Id); + } + + [Fact] + public async Task DisposingConnectionShouldCancelPendingResultWaiters() + { + var pipeline = new TransportPipelineMock(); + pipeline + .Setup(n => n.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var homeAssistantConnection = CreateHomeAssistantConnection(pipeline); + var resultTask = homeAssistantConnection.SendCommandAndReturnHassMessageResponseAsync(new FakeCommand(), CancellationToken.None); + + await homeAssistantConnection.DisposeAsync(); + + await Assert.ThrowsAnyAsync( + () => resultTask.WaitAsync(TimeSpan.FromSeconds(5))); + } } diff --git a/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs b/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs index 6b8c1aa4b..840b1d5c7 100644 --- a/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs +++ b/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs @@ -1,3 +1,5 @@ +using System.Collections.Concurrent; + namespace NetDaemon.Client.Internal; internal class HomeAssistantConnection : IHomeAssistantConnection, IHomeAssistantHassMessages @@ -13,6 +15,7 @@ internal class HomeAssistantConnection : IHomeAssistantConnection, IHomeAssistan private readonly CancellationTokenSource _internalCancelSource = new(); private readonly Subject _hassMessageSubject = new(); + private readonly ConcurrentDictionary> _pendingResults = new(); private readonly Task _handleNewMessagesTask; private const int WaitForResultTimeout = 20000; @@ -148,6 +151,7 @@ public async ValueTask DisposeAsync() if (!_internalCancelSource.IsCancellationRequested) await _internalCancelSource.CancelAsync(); + CancelPendingResults(); // Gracefully wait for task or timeout await Task.WhenAny( @@ -189,17 +193,31 @@ private async Task> SendCommandAsyncInternal(T command, Can // increasing the messageId and Sending the message command.Id = ++_messageId; - // We make a task that subscribe for the return result message - // this task will be returned and handled by caller - var resultEvent = _hassMessageSubject - .Where(n => n.Type == "result" && n.Id == command.Id) - .FirstAsync().ToTask(CancellationToken.None); - // We dont want to pass the incoming CancellationToken here because it will throw a TaskCanceledException - // when calling services from an Apps Dispose(Async) and hide possible actual exceptions + // Complete result waiters directly from the receive loop instead of creating + // one filtered Rx subscription per command. + var resultEvent = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + if (!_pendingResults.TryAdd(command.Id, resultEvent)) + throw new InvalidOperationException($"A command with id {command.Id} is already pending"); - await _transportPipeline.SendMessageAsync(command, cancelToken); + try + { + await _transportPipeline.SendMessageAsync(command, cancelToken); + } + catch (OperationCanceledException) + { + if (_pendingResults.TryRemove(command.Id, out var pendingResult)) + pendingResult.TrySetCanceled(cancelToken); + throw; + } + catch (Exception e) + { + if (_pendingResults.TryRemove(command.Id, out var pendingResult)) + pendingResult.TrySetException(e); + throw; + } - return resultEvent; + return resultEvent.Task; } finally { @@ -219,7 +237,7 @@ private async Task HandleNewMessages() { foreach (var obj in msg) { - _hassMessageSubject.OnNext(obj); + HandleIncomingMessage(obj); } } catch (Exception e) @@ -235,12 +253,30 @@ private async Task HandleNewMessages() finally { _logger.LogTrace("Stop processing new messages"); + CancelPendingResults(); // make sure we always cancel any blocking operations if (!_internalCancelSource.IsCancellationRequested) await _internalCancelSource.CancelAsync(); } } + private void HandleIncomingMessage(HassMessage message) + { + if (message.Type == "result" && _pendingResults.TryRemove(message.Id, out var completionSource)) + completionSource.TrySetResult(message); + + _hassMessageSubject.OnNext(message); + } + + private void CancelPendingResults() + { + foreach (var pendingResult in _pendingResults) + { + if (_pendingResults.TryRemove(pendingResult.Key, out var completionSource)) + completionSource.TrySetCanceled(); + } + } + private Task CloseAsync() { return _transportPipeline.CloseAsync(); diff --git a/src/Client/NetDaemon.HassClient/Internal/Net/WebSocketTransportPipeline.cs b/src/Client/NetDaemon.HassClient/Internal/Net/WebSocketTransportPipeline.cs index 6367f71ae..17e8d3101 100644 --- a/src/Client/NetDaemon.HassClient/Internal/Net/WebSocketTransportPipeline.cs +++ b/src/Client/NetDaemon.HassClient/Internal/Net/WebSocketTransportPipeline.cs @@ -1,3 +1,5 @@ +using System.Buffers; + namespace NetDaemon.Client.Internal.Net; internal class WebSocketClientTransportPipeline(IWebSocketClient clientWebSocket) : IWebSocketClientTransportPipeline @@ -12,7 +14,6 @@ internal class WebSocketClientTransportPipeline(IWebSocketClient clientWebSocket }; private readonly CancellationTokenSource _internalCancelSource = new(); - private readonly Pipe _pipe = new(); private readonly IWebSocketClient _ws = clientWebSocket ?? throw new ArgumentNullException(nameof(clientWebSocket)); private static int DefaultTimeOut => 5000; @@ -50,24 +51,13 @@ public async ValueTask GetNextMessagesAsync(CancellationToken cancelToke _internalCancelSource.Token, cancelToken ); - try - { - // First we start the serialization task that will process - // the pipeline for new data written from websocket input - // We want the processing to start before we read data - // from the websocket so the pipeline is not getting full - var serializeTask = ReadMessagesFromPipelineAndSerializeAsync(combinedTokenSource.Token); - await ReadMessageFromWebSocketAndWriteToPipelineAsync(combinedTokenSource.Token).ConfigureAwait(false); - var result = await serializeTask.ConfigureAwait(false); - // File.WriteAllText("./json_result.json", JsonSerializer.Serialize(result, _defaultSerializerOptions)); - // We need to make sure the serialize task is finished before we throw the exception - combinedTokenSource.Token.ThrowIfCancellationRequested(); - return result; - } - finally - { - _pipe.Reset(); - } + var messageBuffer = new ArrayBufferWriter(); + await ReadMessageFromWebSocketAsync(messageBuffer, combinedTokenSource.Token).ConfigureAwait(false); + combinedTokenSource.Token.ThrowIfCancellationRequested(); + var result = DeserializeMessages(messageBuffer.WrittenSpan); + // File.WriteAllText("./json_result.json", JsonSerializer.Serialize(result, _defaultSerializerOptions)); + combinedTokenSource.Token.ThrowIfCancellationRequested(); + return result; } public Task SendMessageAsync(T message, CancellationToken cancelToken) where T : class @@ -86,89 +76,68 @@ public Task SendMessageAsync(T message, CancellationToken cancelToken) where return _ws.SendAsync(result, WebSocketMessageType.Text, true, combinedTokenSource.Token); } - /// - /// Continuously reads the data from the pipe and serialize to object - /// from the json that are read - /// - /// Cancellation token - /// The type to serialize to - private async Task ReadMessagesFromPipelineAndSerializeAsync(CancellationToken cancelToken) + private static T[] DeserializeMessages(ReadOnlySpan payload) + where T : class { - try - { - var message = await JsonSerializer.DeserializeAsync(_pipe.Reader.AsStream(), - cancellationToken: cancelToken).ConfigureAwait(false) - ?? throw new ApplicationException( - "Deserialization of websocket returned empty result (null)"); - if (message.ValueKind == JsonValueKind.Array) - { - // This is a coalesced message containing multiple messages so we need to - // deserialize it as an array - return message.Deserialize() ?? throw new ApplicationException( - "Deserialization of websocket returned empty result (null)"); - } - else - { - // This is normal message and we deserialize it as object - var obj = message.Deserialize() ?? throw new ApplicationException( - "Deserialization of websocket returned empty result (null)"); - return [obj]; - } - } - finally + if (payload.IsEmpty) + throw new ApplicationException("Deserialization of websocket returned empty result (null)"); + + if (IsJsonArray(payload)) + return JsonSerializer.Deserialize(payload) ?? throw new ApplicationException( + "Deserialization of websocket returned empty result (null)"); + + var obj = JsonSerializer.Deserialize(payload) ?? throw new ApplicationException( + "Deserialization of websocket returned empty result (null)"); + return [obj]; + } + + private static bool IsJsonArray(ReadOnlySpan payload) + { + foreach (var value in payload) { - // Always complete the reader - await _pipe.Reader.CompleteAsync().ConfigureAwait(false); + if (value is (byte)' ' or (byte)'\n' or (byte)'\r' or (byte)'\t') + continue; + + return value == (byte)'['; } + + throw new ApplicationException("Deserialization of websocket returned empty result (null)"); } /// /// Read one or more chunks of a message and writes the result - /// to the pipeline + /// to the buffer /// /// /// A websocket message can be 1 to several chunks of data. - /// As data are read it is written on the pipeline for - /// the json serializer in function ReadMessageFromPipelineAndSerializeAsync - /// to continuously serialize. Using pipes is very efficient - /// way to reuse memory and get speedy results + /// As data are read it is written to a contiguous buffer so we can inspect + /// the first token and deserialize directly to either a single message or + /// a coalesced message array. /// - private async Task ReadMessageFromWebSocketAndWriteToPipelineAsync(CancellationToken cancelToken) + private async Task ReadMessageFromWebSocketAsync(ArrayBufferWriter messageBuffer, CancellationToken cancelToken) { - try + while (!cancelToken.IsCancellationRequested && !_ws.CloseStatus.HasValue) { - while (!cancelToken.IsCancellationRequested && !_ws.CloseStatus.HasValue) + var memory = messageBuffer.GetMemory(); + var result = await _ws.ReceiveAsync(memory, cancelToken).ConfigureAwait(false); + if ( + _ws.State == WebSocketState.Open && + result.MessageType != WebSocketMessageType.Close) { - var memory = _pipe.Writer.GetMemory(); - var result = await _ws.ReceiveAsync(memory, cancelToken).ConfigureAwait(false); - if ( - _ws.State == WebSocketState.Open && - result.MessageType != WebSocketMessageType.Close) - { - _pipe.Writer.Advance(result.Count); - - await _pipe.Writer.FlushAsync(cancelToken).ConfigureAwait(false); + messageBuffer.Advance(result.Count); - if (result.EndOfMessage) break; - } - else if (_ws.State == WebSocketState.CloseReceived) - { - // We got a close message from server or if it still open we got canceled - // in both cases it is important to send back the close message - await SendCorrectCloseFrameToRemoteWebSocket().ConfigureAwait(false); + if (result.EndOfMessage) break; + } + else if (_ws.State == WebSocketState.CloseReceived) + { + // We got a close message from server or if it still open we got canceled + // in both cases it is important to send back the close message + await SendCorrectCloseFrameToRemoteWebSocket().ConfigureAwait(false); - // Cancel so the write thread is canceled before pipe is complete - await _internalCancelSource.CancelAsync(); - } + // Cancel so the read operation is canceled before returning + await _internalCancelSource.CancelAsync(); } } - finally - { - // We have successfully read the whole message, - // make available to reader - // even if failure or we cannot reset the pipe - await _pipe.Writer.CompleteAsync().ConfigureAwait(false); - } } /// From 2d80d9aa1776ac74ef5865d922f0946e40ebe7f4 Mon Sep 17 00:00:00 2001 From: Tomas Date: Thu, 18 Jun 2026 23:46:41 +0200 Subject: [PATCH 3/3] Improve patch coverage for hot-path tests --- .../HomeAssistantConnectionTests.cs | 105 ++++++++++++++++++ .../Net/WebSocketTransportPipelineTests.cs | 99 +++++++++++++++++ 2 files changed, 204 insertions(+) diff --git a/src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/HomeAssistantConnectionTests.cs b/src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/HomeAssistantConnectionTests.cs index 81a69faa4..697bf8c1d 100644 --- a/src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/HomeAssistantConnectionTests.cs +++ b/src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/HomeAssistantConnectionTests.cs @@ -128,6 +128,55 @@ public async Task ResultMessagesShouldStillBePublishedToRawHassMessageSubscriber rawResult.Id.Should().Be(command.Id); } + [Fact] + public async Task ResultMessageWithoutPendingCommandShouldStillBePublishedToRawSubscribers() + { + var pipeline = new TransportPipelineMock(); + + await using var homeAssistantConnection = CreateHomeAssistantConnection(pipeline); + var rawMessages = (homeAssistantConnection as IHomeAssistantHassMessages)!; + var rawResultTask = rawMessages.OnHassMessage + .Where(message => message.Type == "result") + .FirstAsync() + .ToTask(); + + pipeline.AddResponse(new HassMessage { Type = "result", Id = 123, Success = true }); + + var rawResult = await rawResultTask.WaitAsync(TimeSpan.FromSeconds(5)); + + rawResult.Id.Should().Be(123); + } + + [Fact] + public async Task NonResultMessageShouldBypassPendingResultCompletionAndStillBePublished() + { + var pipeline = new TransportPipelineMock(); + pipeline + .Setup(n => n.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + await using var homeAssistantConnection = CreateHomeAssistantConnection(pipeline); + var rawMessages = (homeAssistantConnection as IHomeAssistantHassMessages)!; + var rawEventTask = rawMessages.OnHassMessage + .Where(message => message.Type == "event") + .FirstAsync() + .ToTask(); + + var command = new FakeCommand(); + var resultTask = homeAssistantConnection.SendCommandAndReturnHassMessageResponseAsync(command, CancellationToken.None); + + pipeline.AddResponse(new HassMessage { Type = "event", Id = command.Id }); + var rawEvent = await rawEventTask.WaitAsync(TimeSpan.FromSeconds(5)); + + rawEvent.Id.Should().Be(command.Id); + resultTask.IsCompleted.Should().BeFalse(); + + pipeline.AddResponse(new HassMessage { Type = "result", Id = command.Id, Success = true }); + var result = await resultTask.WaitAsync(TimeSpan.FromSeconds(5)); + + result!.Id.Should().Be(command.Id); + } + [Fact] public async Task SendFailureShouldFaultResultWaiterWithoutBreakingLaterCommands() { @@ -158,6 +207,62 @@ await Assert.ThrowsAsync( result!.Id.Should().Be(command.Id); } + [Fact] + public async Task SendCancellationShouldCancelResultWaiterWithoutBreakingLaterCommands() + { + var pipeline = new TransportPipelineMock(); + var firstSend = true; + using var cancelSource = new CancellationTokenSource(); + pipeline + .Setup(n => n.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns(async (_, _) => + { + if (!firstSend) + return; + + firstSend = false; + await cancelSource.CancelAsync(); + await Task.FromCanceled(cancelSource.Token); + }); + + await using var homeAssistantConnection = CreateHomeAssistantConnection(pipeline); + + await Assert.ThrowsAsync( + () => homeAssistantConnection.SendCommandAndReturnHassMessageResponseAsync(new FakeCommand(), cancelSource.Token)); + + var command = new FakeCommand(); + var resultTask = homeAssistantConnection.SendCommandAndReturnHassMessageResponseAsync(command, CancellationToken.None); + pipeline.AddResponse(new HassMessage { Type = "result", Id = command.Id, Success = true }); + + var result = await resultTask.WaitAsync(TimeSpan.FromSeconds(5)); + + result!.Id.Should().Be(command.Id); + } + + [Fact] + public async Task FailedResultMessageShouldCompletePendingResultThenThrow() + { + var pipeline = new TransportPipelineMock(); + pipeline + .Setup(n => n.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + await using var homeAssistantConnection = CreateHomeAssistantConnection(pipeline); + var command = new FakeCommand { Type = "test_command" }; + var resultTask = homeAssistantConnection.SendCommandAndReturnHassMessageResponseAsync(command, CancellationToken.None); + + pipeline.AddResponse(new HassMessage + { + Type = "result", + Id = command.Id, + Success = false, + Error = new HassError { Code = 42, Message = "Unable to do what you asked" } + }); + + await Assert.ThrowsAsync( + () => resultTask.WaitAsync(TimeSpan.FromSeconds(5))); + } + [Fact] public async Task DisposingConnectionShouldCancelPendingResultWaiters() { diff --git a/src/Client/NetDaemon.HassClient.Tests/Net/WebSocketTransportPipelineTests.cs b/src/Client/NetDaemon.HassClient.Tests/Net/WebSocketTransportPipelineTests.cs index 97c14caa6..5b9dc76fb 100644 --- a/src/Client/NetDaemon.HassClient.Tests/Net/WebSocketTransportPipelineTests.cs +++ b/src/Client/NetDaemon.HassClient.Tests/Net/WebSocketTransportPipelineTests.cs @@ -25,6 +25,71 @@ public async Task TestGetNextMessageAsyncGetsCorrectMessage() .BeEquivalentTo("auth_required"); } + [Fact] + public async Task TestGetNextMessageAsyncGetsCoalescedMessages() + { + // ARRANGE + WsMock.AddResponse(@"[{""type"": ""event"", ""id"": 1}, {""type"": ""result"", ""id"": 2, ""success"": true}]"); + + // ACT + var msg = await DefaultPipeline.GetNextMessagesAsync(CancellationToken.None).ConfigureAwait(false); + + // ASSERT + msg.Should().HaveCount(2); + msg.Select(n => n.Type).Should().Equal("event", "result"); + msg.Select(n => n.Id).Should().Equal(1, 2); + } + + [Fact] + public async Task TestGetNextMessageAsyncGetsCoalescedMessagesWithLeadingWhitespace() + { + // ARRANGE + WsMock.AddResponse(" \r\n\t" + @"[{""type"": ""event"", ""id"": 1}]"); + + // ACT + var msg = await DefaultPipeline.GetNextMessagesAsync(CancellationToken.None).ConfigureAwait(false); + + // ASSERT + msg.Should().ContainSingle(); + msg[0].Type.Should().Be("event"); + msg[0].Id.Should().Be(1); + } + + [Theory] + [InlineData("")] + [InlineData(" \r\n\t")] + public async Task TestGetNextMessageAsyncOnEmptyPayloadShouldThrowException(string payload) + { + // ARRANGE + WsMock.AddResponse(payload); + + // ACT AND ASSERT + await Assert.ThrowsAsync(async () => + await DefaultPipeline.GetNextMessagesAsync(CancellationToken.None).ConfigureAwait(false)); + } + + [Fact] + public async Task TestGetNextMessageAsyncOnJsonNullShouldThrowException() + { + // ARRANGE + WsMock.AddResponse("null"); + + // ACT AND ASSERT + await Assert.ThrowsAsync(async () => + await DefaultPipeline.GetNextMessagesAsync(CancellationToken.None).ConfigureAwait(false)); + } + + [Fact] + public async Task TestGetNextMessageAsyncOnInvalidJsonShouldThrowException() + { + // ARRANGE + WsMock.AddResponse("{not-valid-json"); + + // ACT AND ASSERT + await Assert.ThrowsAsync(async () => + await DefaultPipeline.GetNextMessagesAsync(CancellationToken.None).ConfigureAwait(false)); + } + [Fact] public async Task TestGetNextMessageAsyncOnClosedSocketShouldCastException() { @@ -115,6 +180,40 @@ await Assert.ThrowsAsync(async () => n.CloseOutputAsync(It.IsAny(), It.IsAny(), It.IsAny())); } + [Fact] + public async Task TestGetNextMessageAsyncOnCloseMessageWithoutCloseReceivedShouldRespectCancellation() + { + // ARRANGE + using var cancelSource = new CancellationTokenSource(); + WsMock.Setup(n => n.ReceiveAsync(It.IsAny>(), It.IsAny())) + .ReturnsAsync(() => + { + cancelSource.Cancel(); + return new ValueWebSocketReceiveResult(0, WebSocketMessageType.Close, true); + }); + + // ACT AND ASSERT + await Assert.ThrowsAsync(async () => + await DefaultPipeline.GetNextMessagesAsync(cancelSource.Token).ConfigureAwait(false)); + + WsMock.Verify(n => + n.CloseOutputAsync(It.IsAny(), It.IsAny(), It.IsAny()), + Times.Never); + } + + [Fact] + public async Task TestSendMessageAsyncWithCancelledTokenShouldThrowException() + { + // ARRANGE + using var cancelSource = new CancellationTokenSource(); + await cancelSource.CancelAsync(); + + // ACT AND ASSERT + await Assert.ThrowsAsync(async () => + await DefaultPipeline.SendMessageAsync(new {test = "test"}, cancelSource.Token) + .ConfigureAwait(false)); + } + [Fact] public async Task TestDisposeAsyncCallsCloseWhenOpen() {