diff --git a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs index 4f4ddf9a..30d2dc70 100644 --- a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs +++ b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs @@ -30,10 +30,11 @@ public sealed class DeadlockTortureTest { private const int ItemCount = 200; private const int Iterations = 50; - private const int TimeoutSeconds = 15; + private const int TimeoutSeconds = 60; private static async Task RunBidirectionalDeadlockTest( Func>, IObservable>> pipeline, + Action? subjectPusher = null, int iterations = Iterations) { for (var iter = 0; iter < iterations; iter++) @@ -44,11 +45,13 @@ private static async Task RunBidirectionalDeadlockTest( using var aToB = pipeline(sourceA.Connect().Filter(x => x.Name.StartsWith("A"))).PopulateInto(sourceB); using var bToA = pipeline(sourceB.Connect().Filter(x => x.Name.StartsWith("B"))).PopulateInto(sourceA); - using var barrier = new Barrier(2); + var participants = subjectPusher is null ? 2 : 3; + using var barrier = new Barrier(participants); var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + var taskC = subjectPusher is null ? null : Task.Run(() => { barrier.SignalAndWait(); subjectPusher(); }); - var completed = Task.WhenAll(taskA, taskB); + var completed = taskC is null ? Task.WhenAll(taskA, taskB) : Task.WhenAll(taskA, taskB, taskC); if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed) return false; } @@ -64,26 +67,84 @@ [Fact] public async Task AutoRefresh_DoesNotDeadlock() => [Fact] public async Task GroupOn_DoesNotDeadlock() => (await RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()))).Should().BeTrue(); + [Fact] public async Task GroupWithImmutableState_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey))).Should().BeTrue(); + [Fact] public async Task Page_DoesNotDeadlock() { using var req = new BehaviorSubject(new PageRequest(1, 50)); - (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + + [Fact] public async Task SortAndPage_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new PageRequest(1, 50)); + (await RunBidirectionalDeadlockTest( + s => s.SortAndPage(SortExpressionComparer.Ascending(p => p.Age), req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue(); } [Fact] public async Task Virtualise_DoesNotDeadlock() { using var req = new BehaviorSubject(new VirtualRequest(0, 50)); - (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + + [Fact] public async Task SortAndVirtualize_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new VirtualRequest(0, 50)); + (await RunBidirectionalDeadlockTest( + s => s.SortAndVirtualize(SortExpressionComparer.Ascending(p => p.Age), req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + + [Fact] public async Task QueryWhenChanged_DoesNotDeadlock() + { + for (var iter = 0; iter < Iterations; iter++) + { + using var sourceA = new SourceCache(p => p.UniqueKey); + using var sourceB = new SourceCache(p => p.UniqueKey); + + // QueryWhenChanged with an itemChangedTrigger exercises the Merge branch. + // A side-channel write into the other cache closes the same ABBA cycle that + // PopulateInto would close for changeset-shaped operators. + using var aToB = sourceA.Connect() + .Filter(p => p.Name.StartsWith("A")) + .QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age)) + .Subscribe(_ => sourceB.AddOrUpdate(new Person("A-marker", 0))); + using var bToA = sourceB.Connect() + .Filter(p => p.Name.StartsWith("B")) + .QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age)) + .Subscribe(_ => sourceA.AddOrUpdate(new Person("B-marker", 0))); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + + var completed = Task.WhenAll(taskA, taskB); + (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); + } } [Fact] public async Task TransformWithForce_DoesNotDeadlock() { using var force = new Subject>(); - (await RunBidirectionalDeadlockTest(s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) force.OnNext(static (p, _) => true); })).Should().BeTrue(); } - [Fact] public async Task BatchIf_DoesNotDeadlock() => - (await RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject(false), false, (TimeSpan?)null))).Should().BeTrue(); + [Fact] public async Task BatchIf_DoesNotDeadlock() + { + using var pause = new BehaviorSubject(false); + (await RunBidirectionalDeadlockTest( + s => s.BatchIf(pause, false, (TimeSpan?)null), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); })).Should().BeTrue(); + } [Fact] public async Task DisposeMany_DoesNotDeadlock() => (await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue(); @@ -94,31 +155,65 @@ [Fact] public async Task OnItemRemoved_DoesNotDeadlock() => [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() { using var pageReq = new BehaviorSubject(new PageRequest(1, 100)); + using var virtReq = new BehaviorSubject(new VirtualRequest(0, 100)); using var force = new Subject>(); (await RunBidirectionalDeadlockTest( - s => s.AutoRefresh(p => p.Age) + s => s.GroupWithImmutableState(p => p.Age % 3) + .TransformMany(g => g.Items, p => p.UniqueKey) + .AutoRefresh(p => p.Age) .Filter(p => p.Age >= 0) .Transform((p, k) => new Person("X-" + p.Name, p.Age), force) .OnItemRemoved(_ => { }) .DisposeMany() .Sort(SortExpressionComparer.Ascending(p => p.Age)) + .Virtualise(virtReq) .Page(pageReq), + subjectPusher: () => + { + for (var j = 0; j < ItemCount; j++) + { + force.OnNext(static (p, _) => true); + pageReq.OnNext(new PageRequest(1 + (j % 4), 50 + (j % 4) * 50)); + virtReq.OnNext(new VirtualRequest(j * 5, 50 + (j % 4) * 50)); + } + }, iterations: Iterations * 2)).Should().BeTrue(); } [Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock() { using var pageReq = new BehaviorSubject(new PageRequest(1, 50)); + using var pageReq2 = new BehaviorSubject(new PageRequest(1, 50)); using var virtReq = new BehaviorSubject(new VirtualRequest(0, 50)); + using var virtReq2 = new BehaviorSubject(new VirtualRequest(0, 50)); + using var pause = new BehaviorSubject(false); var results = await Task.WhenAll( - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), 30), - RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), 30), - RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), 30), - RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), 30), - RunBidirectionalDeadlockTest(s => s.DisposeMany(), 30), - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), 30), - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(virtReq), 30), - RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject(false), false, (TimeSpan?)null), 30)); + RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), iterations: 30), + RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), iterations: 30), + RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), iterations: 30), + RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey), iterations: 30), + RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), iterations: 30), + RunBidirectionalDeadlockTest(s => s.DisposeMany(), iterations: 30), + RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.SortAndPage(SortExpressionComparer.Ascending(p => p.Age), pageReq2), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq2.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(virtReq), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.SortAndVirtualize(SortExpressionComparer.Ascending(p => p.Age), virtReq2), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq2.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.BatchIf(pause, false, (TimeSpan?)null), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); }, + iterations: 30)); results.Should().AllSatisfy(r => r.Should().BeTrue()); } diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs new file mode 100644 index 00000000..fe67508a --- /dev/null +++ b/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs @@ -0,0 +1,232 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Verifies the Rx Merge-compatible terminal semantics and the queue's serialization guarantee +/// for concurrent producers. +/// +public sealed class DeliveryQueueMergeFixture +{ + [Fact] + public void OnNext_FromAllSources_IsForwardedInArrivalOrder() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var received = new List(); + using var sub = a.DeliveryQueueMerge(b, c).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + c.OnNext(3); + a.OnNext(4); + + received.Should().Equal(1, 2, 3, 4); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterEverySourceCompletes() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var completed = false; + using var sub = a.DeliveryQueueMerge(b, c).Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse(); + + b.OnCompleted(); + completed.Should().BeFalse(); + + c.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse(); + } + + [Fact] + public void OnError_AfterFirstError_IsDroppedByQueue() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first); + } + + [Fact] + public void OnCompleted_AfterError_IsDroppedByQueue() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse(); + } + + [Fact] + public void SynchronousTerminal_AtSubscribe_IsCountedTowardCompletion() + { + var immediate = Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse(); + live.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_AtSubscribe_PropagatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } + + [Fact] + public async Task ConcurrentOnNext_FromManyProducers_IsSerializedToObserver() + { + // The queue's contract is that the downstream observer never sees concurrent OnNext calls, + // regardless of how many producers are racing on the inputs. Subscribe to two sources via + // two concurrent tasks, push interleaved items, and verify that no two OnNext calls overlap + // and every item is delivered exactly once. + const int itemsPerProducer = 1_000; + + using var a = new Subject(); + using var b = new Subject(); + + var inFlight = 0; + var maxInFlight = 0; + var received = new ConcurrentQueue(); + + using var sub = a.DeliveryQueueMerge(b).Subscribe(v => + { + var now = Interlocked.Increment(ref inFlight); + var prev = Volatile.Read(ref maxInFlight); + while (now > prev && Interlocked.CompareExchange(ref maxInFlight, now, prev) != prev) + { + prev = Volatile.Read(ref maxInFlight); + } + received.Enqueue(v); + Interlocked.Decrement(ref inFlight); + }); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) a.OnNext(i); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) b.OnNext(itemsPerProducer + i); }); + + await Task.WhenAll(taskA, taskB); + + received.Count.Should().Be(itemsPerProducer * 2); + maxInFlight.Should().Be(1, "concurrent OnNext to the observer must be serialized by the queue"); + + var expected = Enumerable.Range(0, itemsPerProducer * 2).ToHashSet(); + received.Should().BeEquivalentTo(expected); + } + + [Fact] + public void Subscription_OccursInArgumentOrder() + { + var subscribed = new List(); + var first = Observable.Create(o => { subscribed.Add(0); return () => { }; }); + var second = Observable.Create(o => { subscribed.Add(1); return () => { }; }); + var third = Observable.Create(o => { subscribed.Add(2); return () => { }; }); + + using var sub = first.DeliveryQueueMerge(second, third).Subscribe(_ => { }); + + subscribed.Should().Equal(0, 1, 2); + } + + [Fact] + public void Dispose_StopsForwardingFromAnySource() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + var sub = a.DeliveryQueueMerge(b).Subscribe(received.Add); + + a.OnNext(1); + sub.Dispose(); + a.OnNext(2); + b.OnNext(3); + + received.Should().Equal(1); + } + + [Fact] + public void NoOthers_FallsBackToFirstAlone() + { + using var a = new Subject(); + var received = new List(); + var completed = false; + using var sub = a.DeliveryQueueMerge().Subscribe(received.Add, () => completed = true); + + a.OnNext(7); + a.OnNext(11); + a.OnCompleted(); + + received.Should().Equal(7, 11); + completed.Should().BeTrue(); + } +} \ No newline at end of file diff --git a/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs b/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs new file mode 100644 index 00000000..85b95b8e --- /dev/null +++ b/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs @@ -0,0 +1,175 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Reactive.Subjects; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Covers the contract the helper has to honour as a drop-in +/// replacement: subscription order, all-must-complete OnCompleted, first-error-wins OnError, and synchronous terminal +/// notifications. +/// +public sealed class UnsynchronizedMergeFixture +{ + [Fact] + public void OnNext_FromBothSources_IsForwardedInArrivalOrder() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedMerge(b).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + a.OnNext(3); + b.OnNext(4); + + received.Should().Equal(1, 2, 3, 4); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterAllSourcesComplete() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var completed = false; + using var sub = a.UnsynchronizedMerge(b, c).Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse("a single source completion must not terminate the merged stream"); + + b.OnCompleted(); + completed.Should().BeFalse("two of three completions still leave one source live"); + + c.OnCompleted(); + completed.Should().BeTrue("after every source has completed the merged stream must emit OnCompleted"); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedMerge(b, c).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException("first"); + b.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("OnCompleted must not fire after OnError"); + } + + [Fact] + public void OnError_AfterFirstError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.UnsynchronizedMerge(b).Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first, "first error wins; subsequent errors from other sources must be dropped"); + } + + [Fact] + public void OnCompleted_AfterError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("a late OnCompleted from a surviving source must not arrive after OnError has fired"); + } + + [Fact] + public void Subscription_OccursInArgumentOrder() + { + var subscribed = new List(); + var first = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(0); return () => { }; }); + var second = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(1); return () => { }; }); + var third = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(2); return () => { }; }); + + using var sub = first.UnsynchronizedMerge(second, third).Subscribe(_ => { }); + + subscribed.Should().Equal(0, 1, 2); + } + + [Fact] + public void SynchronousTerminal_BeforeOtherSourcesSubscribe_IsHandled() + { + // A source that completes synchronously at subscribe time decrements the pending counter immediately. + // If the helper miscounted, the merged stream would either complete prematurely or never complete. + var immediate = System.Reactive.Linq.Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.UnsynchronizedMerge(live).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse("the live source has not completed yet"); + + live.OnCompleted(); + + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_BeforeOtherSourcesSubscribe_TerminatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = System.Reactive.Linq.Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.UnsynchronizedMerge(live).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } + + [Fact] + public void NoOthers_FallsBackToFirstAlone() + { + // Boundary: zero entries in the params array. Behaviour must mirror Observable.Merge over a single source. + using var a = new Subject(); + var received = new List(); + var completed = false; + using var sub = a.UnsynchronizedMerge().Subscribe(received.Add, () => completed = true); + + a.OnNext(7); + a.OnNext(11); + a.OnCompleted(); + + received.Should().Equal(7, 11); + completed.Should().BeTrue(); + } +} \ No newline at end of file diff --git a/src/DynamicData/Cache/Internal/AutoRefresh.cs b/src/DynamicData/Cache/Internal/AutoRefresh.cs index ee81fe58..f1f17d67 100644 --- a/src/DynamicData/Cache/Internal/AutoRefresh.cs +++ b/src/DynamicData/Cache/Internal/AutoRefresh.cs @@ -32,9 +32,8 @@ public IObservable> Run() => Observable.Create list.Count > 0).Select(items => new ChangeSet(items)); // publish refreshes and underlying changes - var queue = new SharedDeliveryQueue(); - var publisher = shared.SynchronizeSafe(queue).Merge(refreshChanges.SynchronizeSafe(queue)).SubscribeSafe(observer); + var publisher = shared.DeliveryQueueMerge(refreshChanges).SubscribeSafe(observer); - return new CompositeDisposable(publisher, shared.Connect(), queue); + return new CompositeDisposable(publisher, shared.Connect()); }); } diff --git a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs index ba33db03..fc6f3b16 100644 --- a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs +++ b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs @@ -29,7 +29,7 @@ public IObservable> Run() => var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0); - return new CompositeDisposable(groups.Merge(regroup).SubscribeSafe(observer), queue); + return new CompositeDisposable(groups.UnsynchronizedMerge(regroup).SubscribeSafe(observer), queue); }); private sealed class Grouper(Func groupSelectorKey) diff --git a/src/DynamicData/Cache/Internal/Page.cs b/src/DynamicData/Cache/Internal/Page.cs index e4159587..2d91ac29 100644 --- a/src/DynamicData/Cache/Internal/Page.cs +++ b/src/DynamicData/Cache/Internal/Page.cs @@ -19,7 +19,7 @@ public IObservable> Run() => Observable.Create updates is not null) .Select(x => x!) .SubscribeSafe(observer), queue); diff --git a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs index 01828e85..7e976424 100644 --- a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs +++ b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs @@ -49,7 +49,7 @@ public IObservable> Run() return cache; }).Select(list => new AnonymousQuery(list)); - return new CompositeDisposable(sourceChanged.Merge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); + return new CompositeDisposable(sourceChanged.UnsynchronizedMerge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); }); } } diff --git a/src/DynamicData/Cache/Internal/Sort.cs b/src/DynamicData/Cache/Internal/Sort.cs index 11b39035..6afb9632 100644 --- a/src/DynamicData/Cache/Internal/Sort.cs +++ b/src/DynamicData/Cache/Internal/Sort.cs @@ -57,7 +57,7 @@ public IObservable> Run() => Observable.Create result is not null).Select(x => x!).SubscribeSafe(observer), queue); + return new CompositeDisposable(comparerChanged.UnsynchronizedMerge(dataChanged, sortAgain).Where(result => result is not null).Select(x => x!).SubscribeSafe(observer), queue); }); private sealed class Sorter(SortOptimisations optimisations, IComparer? comparer = null, int resetThreshold = -1) diff --git a/src/DynamicData/Cache/Internal/SortAndPage.cs b/src/DynamicData/Cache/Internal/SortAndPage.cs index 2d612a9b..02ef996d 100644 --- a/src/DynamicData/Cache/Internal/SortAndPage.cs +++ b/src/DynamicData/Cache/Internal/SortAndPage.cs @@ -111,10 +111,10 @@ public IObservable>> Run() => return ApplyPagedChanges(changes); }); - return new CompositeDisposable(Observable.Merge( - comparerChanged.Skip(1), - paramsChanged.Where(changes => changes.Count is not 0), - dataChange.Where(changes => changes.Count is not 0)) + return new CompositeDisposable(comparerChanged.Skip(1) + .UnsynchronizedMerge( + paramsChanged.Where(changes => changes.Count is not 0), + dataChange.Where(changes => changes.Count is not 0)) .SubscribeSafe(observer), queue); ChangeSet> ApplyPagedChanges(IChangeSet? changeSet = null) diff --git a/src/DynamicData/Cache/Internal/SortAndVirtualize.cs b/src/DynamicData/Cache/Internal/SortAndVirtualize.cs index 44c6d3dc..0e6ae550 100644 --- a/src/DynamicData/Cache/Internal/SortAndVirtualize.cs +++ b/src/DynamicData/Cache/Internal/SortAndVirtualize.cs @@ -113,8 +113,7 @@ public IObservable>> Run() => return new CompositeDisposable( comparerChanged - .Merge(paramsChanged) - .Merge(dataChange) + .UnsynchronizedMerge(paramsChanged, dataChange) .Where(changes => changes.Count is not 0) .SubscribeSafe(observer), queue); diff --git a/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs b/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs index c7c95aa6..4bce97ac 100644 --- a/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs +++ b/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs @@ -25,7 +25,7 @@ public IObservable> Run() => Observable.Create CaptureChanges(cache, selector)).Select(changes => new ChangeSet(changes)).NotEmpty(); - var sourceAndRefreshes = shared.Merge(refresher); + var sourceAndRefreshes = shared.UnsynchronizedMerge(refresher); // do raw transform var transform = new Transform(sourceAndRefreshes, transformFactory, exceptionCallback, true).Run(); diff --git a/src/DynamicData/Cache/Internal/Virtualise.cs b/src/DynamicData/Cache/Internal/Virtualise.cs index 9a5fefbc..9e858f5d 100644 --- a/src/DynamicData/Cache/Internal/Virtualise.cs +++ b/src/DynamicData/Cache/Internal/Virtualise.cs @@ -23,7 +23,7 @@ public IObservable> Run() => Observable.Create< var request = _virtualRequests.SynchronizeSafe(queue).Select(virtualiser.Virtualise).Where(x => x is not null).Select(x => x!); var dataChange = _source.SynchronizeSafe(queue).Select(virtualiser.Update).Where(x => x is not null).Select(x => x!); - return new CompositeDisposable(request.Merge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue); + return new CompositeDisposable(request.UnsynchronizedMerge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue); }); private sealed class Virtualiser(VirtualRequest? request = null) diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs new file mode 100644 index 00000000..9adefaeb --- /dev/null +++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs @@ -0,0 +1,79 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Disposables; +using System.Reactive.Linq; + +namespace DynamicData.Internal; + +/// +/// Provides DeliveryQueueMerge, an Rx extension method that serializes the +/// notifications of every input through a single +/// and emits them on the downstream observer outside the queue's lock. +/// +/// +/// Drop-in alternative to +/// for cross-cache pipelines where the Rx Merge gate, held during downstream delivery, +/// would risk an ABBA cycle. serializes enqueues across +/// concurrent producers but releases its gate before delivering, so a downstream +/// observer that walks into another cache's writer lock cannot deadlock with this +/// operator's serialization point. +/// Every input must share the same element type. When the inputs have different +/// element types or require operator-private projections invoked inside the queue's +/// drain, use +/// with a and finish with +/// . +/// +internal static class DeliveryQueueMergeExtensions +{ + /// + /// Merges with by routing every + /// source through a single . Functionally equivalent + /// to : completes + /// only after every source completes; the first error terminates; subscription + /// occurs in argument order. + /// + /// The element type, common to every input. + /// The first input observable. + /// Additional input observables. + /// An observable that emits items from every input, serialized through the queue. + public static IObservable DeliveryQueueMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var queue = new DeliveryQueue(observer); + var totalSources = others.Length + 1; + var subscriptions = new CompositeDisposable(totalSources + 1); + var pending = totalSources; + + // Each source needs its own inner observer instance because Rx's ObserverBase + // sets a one-shot stopped flag on the first OnCompleted/OnError; a single shared + // observer would silently drop terminal notifications from every source after + // the first. OnNext and OnError forward straight to the queue (the queue's gate + // serializes concurrent calls); OnCompleted is counter-gated so only the last + // surviving source's completion terminates the merged stream. + IObserver CreateInner() => + Observer.Create( + queue.OnNext, + queue.OnError, + () => + { + if (Interlocked.Decrement(ref pending) == 0) + { + queue.OnCompleted(); + } + }); + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + // Subscription first so any terminal notification produced during Rx's disposal + // cascade still flows through the still-active queue. Queue last as cleanup. + subscriptions.Add(queue); + return subscriptions; + }); +} diff --git a/src/DynamicData/Internal/SharedDeliveryQueue.cs b/src/DynamicData/Internal/SharedDeliveryQueue.cs index 6eab2889..9ec57a8e 100644 --- a/src/DynamicData/Internal/SharedDeliveryQueue.cs +++ b/src/DynamicData/Internal/SharedDeliveryQueue.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. diff --git a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs index ace8ed4a..283dfff5 100644 --- a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs +++ b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs @@ -2,6 +2,7 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; @@ -78,4 +79,74 @@ public static IObservable SynchronizeSafe(this IObservable source) => // Queue first: ensures in-flight deliveries complete before teardown side effects run return new CompositeDisposable(queue, source.SubscribeSafe(queue)); }); + + /// + /// Merges with into a single observable + /// without taking any synchronization gate. Functionally equivalent to + /// : completes only after + /// every source completes; the first error terminates; subscription occurs in argument order. + /// + /// + /// The caller MUST ensure that delivery from every source is already serialized. + /// In this library the precondition is satisfied by routing every source through the + /// same via + /// . The shared + /// queue's drain loop guarantees that at most one notification is in flight to the + /// downstream observer at a time, so the additional gate that Observable.Merge + /// would install is redundant. + /// Removing that gate matters in cross-cache pipelines: Observable.Merge + /// holds its private _gate for the entire duration of downstream delivery, and + /// when downstream delivery walks into another cache's writer lock, two such gates on + /// two operators form an ABBA cycle that the queue-drain design is meant to prevent. + /// Each source is subscribed through its own + /// instance. The actions close over shared pending and terminated counters, but + /// the observer instances must be distinct because Rx's ObserverBase sets a one-shot + /// stopped flag on the first OnCompleted/OnError; a single shared observer + /// would silently drop terminal notifications from every source after the first. + /// Without the external serialization precondition, concurrent OnNext + /// calls into the shared observer will race. Do not use as a general-purpose + /// Observable.Merge replacement. + /// + public static IObservable UnsynchronizedMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var totalSources = others.Length + 1; + var subscriptions = new CompositeDisposable(totalSources); + var pending = totalSources; + var terminated = 0; + + void OnNextSafe(T value) + { + if (Volatile.Read(ref terminated) == 0) + { + observer.OnNext(value); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref pending) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + + IObserver CreateInner() => Observer.Create(OnNextSafe, OnErrorSafe, OnCompletedSafe); + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + return subscriptions; + }); }