From 2c3248908fcbc682485e6061d73decb83f477735 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Wed, 4 Sep 2024 08:02:57 -0700 Subject: [PATCH] Fix for GroupOnObservable OnCompleted handling (#938) * GroupOnObservable Fix (and Unit Tests) * Remove extra tests --------- Co-authored-by: Roland Pheasant --- .../Cache/GroupOnObservableFixture.cs | 26 ++++++++++++++----- .../Cache/Internal/GroupOnObservable.cs | 7 ++--- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs index 3e8dd8854..1909b2abb 100644 --- a/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs +++ b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs @@ -1,6 +1,8 @@ using System; using System.Linq; +using System.Reactive; using System.Reactive.Linq; +using System.Reactive.Subjects; using System.Threading.Tasks; using Bogus; @@ -30,12 +32,14 @@ public class GroupOnObservableFixture : IDisposable private readonly SourceCache _cache = new (p => p.UniqueKey); private readonly ChangeSetAggregator _results; private readonly GroupChangeSetAggregator _groupResults; + private readonly Subject _grouperShutdown; private readonly Faker _faker; private readonly Randomizer _randomizer = new(0x3141_5926); public GroupOnObservableFixture() { _faker = Fakers.Person.Clone().WithSeed(_randomizer); + _grouperShutdown = new(); _results = _cache.Connect().AsAggregator(); _groupResults = _cache.Connect().GroupOnObservable(CreateFavoriteColorObservable).AsAggregator(); } @@ -179,7 +183,7 @@ public void GroupingSequenceCompletesWhenEmpty() } [Fact] - public void AllSequencesCompleteWhenSourceIsDisposed() + public void AllSequencesShouldCompleteWhenSourceAndGroupingObservablesComplete() { // Arrange _cache.AddOrUpdate(_faker.Generate(InitialCount)); @@ -190,6 +194,7 @@ public void AllSequencesCompleteWhenSourceIsDisposed() // Act _cache.Dispose(); + _grouperShutdown.OnNext(Unit.Default); // Assert results.IsCompleted.Should().BeTrue(); @@ -243,9 +248,11 @@ public async Task ResultsContainsCorrectRegroupedValuesAsync() } [Theory] - [InlineData(false)] - [InlineData(true)] - public void ResultCompletesOnlyWhenSourceCompletes(bool completeSource) + [InlineData(false, false)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(true, true)] + public void ResultCompletesOnlyWhenSourceAndAllGroupingObservablesComplete(bool completeSource, bool completeGroups) { // Arrange _cache.AddOrUpdate(_faker.Generate(InitialCount)); @@ -255,10 +262,14 @@ public void ResultCompletesOnlyWhenSourceCompletes(bool completeSource) { _cache.Dispose(); } + if (completeGroups) + { + _grouperShutdown.OnNext(Unit.Default); + } // Assert _results.IsCompleted.Should().Be(completeSource); - _groupResults.IsCompleted.Should().Be(completeSource); + _groupResults.IsCompleted.Should().Be(completeGroups && completeSource); } [Fact] @@ -311,6 +322,7 @@ public void Dispose() _groupResults.Dispose(); _results.Dispose(); _cache.Dispose(); + _grouperShutdown.Dispose(); } private void RandomFavoriteColorChange() @@ -342,6 +354,6 @@ private static void VerifyGroupingResults(ISourceCache cache, Ch groupResults.Groups.Items.ForEach(group => group.Data.Count.Should().BeGreaterThan(0, "Empty groups should be removed")); } - private static IObservable CreateFavoriteColorObservable(Person person, string key) => - person.WhenPropertyChanged(p => p.FavoriteColor).Select(change => change.Value); + private IObservable CreateFavoriteColorObservable(Person person, string key) => + person.WhenPropertyChanged(p => p.FavoriteColor).Select(change => change.Value).TakeUntil(_grouperShutdown); } diff --git a/src/DynamicData/Cache/Internal/GroupOnObservable.cs b/src/DynamicData/Cache/Internal/GroupOnObservable.cs index 62c91720f..8a3d9c3d6 100644 --- a/src/DynamicData/Cache/Internal/GroupOnObservable.cs +++ b/src/DynamicData/Cache/Internal/GroupOnObservable.cs @@ -42,7 +42,9 @@ IObservable CreateGroupObservable(TObject item, TKey key) => // Next process the Grouping observables created for each item var subMergeMany = shared .MergeMany(CreateGroupObservable) - .SubscribeSafe(onError: observer.OnError); + .SubscribeSafe( + onError: observer.OnError, + onCompleted: observer.OnCompleted); // Finally, emit the results var subResults = shared @@ -52,8 +54,7 @@ IObservable CreateGroupObservable(TObject item, TKey key) => grouper.EmitChanges(observer); parentUpdate = false; }, - onError: observer.OnError, - onCompleted: observer.OnCompleted); + onError: observer.OnError); return new CompositeDisposable(shared.Connect(), subMergeMany, subChanges, grouper); });