From 04e2ff8aebd0dd081bd4a39cb3af5651438f6ea7 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sat, 10 Feb 2024 10:40:33 -0800 Subject: [PATCH] Feature: GroupOnObservable Operator (#847) New Group Operator that groups values based on the latest value returned from an observable created using the given factory function. - Values are not observed downstream until their Observable fires once - Any event after the first will have the effect of moving the value from one group to another - Any errors in the grouping observable will bring down the entire stream - If the source observable completes, the downstream sequence will also complete (and the sequence from each Group will complete). - If the number of items in a group drops to zero, the group will be removed and any subscribers to the Group's Cache will see OnComplete - If the number of items in a group drops to zero, but within the same changeset, another item gets added to that group, the group will NOT be removed/re-added. --- ...ts.DynamicDataTests.DotNet8_0.verified.txt | 27 ++ .../Cache/GroupOnObservableFixture.cs | 345 ++++++++++++++++++ .../Cache/GroupOnPropertyFixture.cs | 3 +- src/DynamicData.Tests/Domain/Fakers.cs | 18 +- src/DynamicData.Tests/Domain/Person.cs | 36 ++ .../Cache/Internal/DynamicGrouper.cs | 226 ++++++++++++ .../Cache/Internal/GroupOnObservable.cs | 60 +++ src/DynamicData/Cache/ObservableCacheEx.cs | 39 ++ .../Cache/Tests/GroupChangeSetAggregator.cs | 105 ++++++ src/DynamicData/Cache/Tests/TestEx.cs | 19 + 10 files changed, 875 insertions(+), 3 deletions(-) create mode 100644 src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs create mode 100644 src/DynamicData/Cache/Internal/DynamicGrouper.cs create mode 100644 src/DynamicData/Cache/Internal/GroupOnObservable.cs create mode 100644 src/DynamicData/Cache/Tests/GroupChangeSetAggregator.cs diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt index 5836f0562..faddb47df 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt @@ -1367,6 +1367,14 @@ namespace DynamicData where TObject : notnull where TKey : notnull where TGroupKey : notnull { } + public static System.IObservable> GroupOnObservable(this System.IObservable> source, System.Func> groupObservableSelector) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull { } + public static System.IObservable> GroupOnObservable(this System.IObservable> source, System.Func> groupObservableSelector) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull { } public static System.IObservable> GroupOnProperty(this System.IObservable> source, System.Linq.Expressions.Expression> propertySelector, System.TimeSpan? propertyChangedThrottle = default, System.Reactive.Concurrency.IScheduler? scheduler = null) where TObject : System.ComponentModel.INotifyPropertyChanged where TKey : notnull @@ -2869,6 +2877,21 @@ namespace DynamicData.Tests public void Dispose() { } protected virtual void Dispose(bool isDisposing) { } } + public class GroupChangeSetAggregator : System.IDisposable + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull + { + public GroupChangeSetAggregator(System.IObservable> source) { } + public DynamicData.IObservableCache, TGroupKey> Data { get; } + public System.Exception? Error { get; } + public DynamicData.IObservableCache, TGroupKey> Groups { get; } + public bool IsCompleted { get; } + public System.Collections.Generic.IReadOnlyList> Messages { get; } + public DynamicData.Diagnostics.ChangeSummary Summary { get; } + public void Dispose() { } + protected virtual void Dispose(bool disposing) { } + } public static class ListTextEx { public static DynamicData.Tests.ChangeSetAggregator AsAggregator(this System.IObservable> source) @@ -2914,6 +2937,10 @@ namespace DynamicData.Tests public static DynamicData.Tests.VirtualChangeSetAggregator AsAggregator(this System.IObservable> source) where TObject : notnull where TKey : notnull { } + public static DynamicData.Tests.GroupChangeSetAggregator AsAggregator(this System.IObservable> source) + where TValue : notnull + where TKey : notnull + where TGroupKey : notnull { } } public class VirtualChangeSetAggregator : System.IDisposable where TObject : notnull diff --git a/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs new file mode 100644 index 000000000..3ebafb2ab --- /dev/null +++ b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs @@ -0,0 +1,345 @@ +using System; +using System.Linq; +using Bogus; +using DynamicData.Tests.Domain; +using DynamicData.Binding; +using System.Reactive.Linq; +using FluentAssertions; +using Xunit; + +using Person = DynamicData.Tests.Domain.Person; +using System.Threading.Tasks; +using DynamicData.Kernel; + +namespace DynamicData.Tests.Cache; + +public class GroupOnObservableFixture : IDisposable +{ +#if DEBUG + private const int InitialCount = 7; + private const int AddCount = 5; + private const int RemoveCount = 3; + private const int UpdateCount = 2; +#else + private const int InitialCount = 103; + private const int AddCount = 53; + private const int RemoveCount = 37; + private const int UpdateCount = 101; +#endif + private readonly SourceCache _personCache = new (p => p.UniqueKey); + private readonly ChangeSetAggregator _personResults; + private readonly GroupChangeSetAggregator _favoriteColorResults; + private readonly Faker _personFaker; + private readonly Randomizer _randomizer = new(0x3141_5926); + + public GroupOnObservableFixture() + { + _personFaker = Fakers.Person.Clone().WithSeed(_randomizer); + _personResults = _personCache.Connect().AsAggregator(); + _favoriteColorResults = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable).AsAggregator(); + } + + [Fact] + public void ResultContainsAllInitialChildren() + { + // Arrange + + // Act + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + + // Assert + _personResults.Data.Count.Should().Be(InitialCount); + _personResults.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + VerifyGroupingResults(); + } + + [Fact] + public void ResultContainsAddedValues() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + + // Act + _personCache.AddOrUpdate(_personFaker.Generate(AddCount)); + + // Assert + _personResults.Data.Count.Should().Be(InitialCount + AddCount); + _personResults.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message"); + VerifyGroupingResults(); + } + + [Fact] + public void ResultDoesNotContainRemovedValues() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + + // Act + _personCache.RemoveKeys(_randomizer.ListItems(_personCache.Items.ToList(), RemoveCount).Select(p => p.UniqueKey)); + + // Assert + _personResults.Data.Count.Should().Be(InitialCount - RemoveCount); + _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + VerifyGroupingResults(); + } + + [Fact] + public void ResultContainsUpdatedValues() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + var replacements = _randomizer.ListItems(_personCache.Items.ToList(), UpdateCount) + .Select(replacePerson => Person.CloneUniqueId(_personFaker.Generate(), replacePerson)); + + // Act + _personCache.AddOrUpdate(replacements); + + // Assert + _personResults.Data.Count.Should().Be(InitialCount, "Only replacements were made"); + _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Updates"); + VerifyGroupingResults(); + } + + [Fact] + public void GroupRemovedWhenEmpty() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); + var removeColor = _randomizer.ListItem(usedColorList); + var colorCount = usedColorList.Count; + + // Act + _personCache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey))); + + // Assert + _personCache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount - 1); + _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _favoriteColorResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount); + _favoriteColorResults.Summary.Overall.Removes.Should().Be(1); + VerifyGroupingResults(); + } + + [Fact] + public void GroupNotRemovedIfAddedBackImmediately() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); + var removeColor = _randomizer.ListItem(usedColorList); + var colorCount = usedColorList.Count; + + // Act + _personCache.Edit(updater => + { + updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey)); + var newPerson = _personFaker.Generate(); + newPerson.FavoriteColor = removeColor; + updater.AddOrUpdate(newPerson); + }); + + // Assert + _personCache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount); + _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Other Added Value"); + _favoriteColorResults.Messages.Count.Should().Be(1, "Shouldn't be removed/re-added"); + _favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount); + _favoriteColorResults.Summary.Overall.Removes.Should().Be(0); + VerifyGroupingResults(); + } + + [Fact] + public void GroupingSequenceCompletesWhenEmpty() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); + var removeColor = _randomizer.ListItem(usedColorList); + + var results = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable) + .Filter(grp => grp.Key == removeColor) + .Take(1) + .MergeMany(grp => grp.Cache.Connect()) + .AsAggregator(); + + // Act + _personCache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey))); + + // Assert + results.IsCompleted.Should().BeTrue(); + VerifyGroupingResults(); + } + + [Fact] + public void AllSequencesCompleteWhenSourceIsDisposed() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + + var results = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable) + .MergeMany(grp => grp.Cache.Connect()) + .AsAggregator(); + + // Act + _personCache.Dispose(); + + // Assert + results.IsCompleted.Should().BeTrue(); + VerifyGroupingResults(); + } + + [Fact] + public void AllGroupsRemovedWhenCleared() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); + var colorCount = usedColorList.Count; + + // Act + _personCache.Clear(); + + // Assert + _personCache.Items.Count().Should().Be(0); + _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount); + _favoriteColorResults.Summary.Overall.Removes.Should().Be(colorCount); + VerifyGroupingResults(); + } + + [Fact] + public void ResultsContainsCorrectRegroupedValues() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + + // Act + Enumerable.Range(0, UpdateCount).ForEach(_ => RandomFavoriteColorChange()); + + // Assert + VerifyGroupingResults(); + } + + [Fact] + public async Task ResultsContainsCorrectRegroupedValuesAsync() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + var tasks = Enumerable.Range(0, UpdateCount).Select(_ => Task.Run(RandomFavoriteColorChange)); + + // Act + await Task.WhenAll(tasks.ToArray()); + + // Assert + VerifyGroupingResults(); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public void ResultCompletesOnlyWhenSourceCompletes(bool completeSource) + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + + // Act + if (completeSource) + { + _personCache.Dispose(); + } + + // Assert + _personResults.IsCompleted.Should().Be(completeSource); + } + + [Fact] + public void ResultFailsIfSourceFails() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + var expectedError = new Exception("Expected"); + var throwObservable = Observable.Throw>(expectedError); + using var results = _personCache.Connect().Concat(throwObservable).GroupOnObservable(CreateFavoriteColorObservable).AsAggregator(); + + // Act + _personCache.Dispose(); + + // Assert + results.Error.Should().Be(expectedError); + } + + [Fact] + public void ResultFailsIfGroupObservableFails() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + var expectedError = new Exception("Expected"); + var throwObservable = Observable.Throw(expectedError); + + // Act + using var results = _personCache.Connect().GroupOnObservable((person, key) => CreateFavoriteColorObservable(person, key).Take(1).Concat(throwObservable)).AsAggregator(); + + // Assert + results.Error.Should().Be(expectedError); + } + + [Fact] + public void OnErrorFiresIfSelectorThrows() + { + // Arrange + _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + var expectedError = new Exception("Expected"); + + // Act + using var results = _personCache.Connect().GroupOnObservable(_ => throw expectedError).AsAggregator(); + + // Assert + results.Error.Should().Be(expectedError); + } + + public void Dispose() + { + _favoriteColorResults.Dispose(); + _personResults.Dispose(); + _personCache.Dispose(); + } + + private void RandomFavoriteColorChange() + { + var person = _randomizer.ListItem(_personCache.Items.ToList()); + lock (person) + { + // Pick a new favorite color + person.FavoriteColor = _randomizer.RandomColor(person.FavoriteColor); + } + } + + private void VerifyGroupingResults() => + VerifyGroupingResults(_personCache, _personResults, _favoriteColorResults); + + private static void VerifyGroupingResults(ISourceCache personCache, ChangeSetAggregator personResults, GroupChangeSetAggregator favoriteColorResults) + { + var expectedPersons = personCache.Items.ToList(); + var expectedGroupings = personCache.Items.GroupBy(p => p.FavoriteColor).ToList(); + + // These should be subsets of each other + expectedPersons.Should().BeEquivalentTo(personResults.Data.Items); + favoriteColorResults.Groups.Count.Should().Be(expectedGroupings.Count); + + // Check each group + foreach (var grouping in expectedGroupings) + { + var color = grouping.Key; + var expectedGroup = grouping.ToList(); + var optionalGroup = favoriteColorResults.Groups.Lookup(color); + + optionalGroup.HasValue.Should().BeTrue(); + var actualGroup = optionalGroup.Value.Data.Items.ToList(); + + expectedGroup.Should().BeEquivalentTo(actualGroup); + } + } + + private static IObservable CreateFavoriteColorObservable(Person person, string key) => + person.WhenPropertyChanged(p => p.FavoriteColor).Select(change => change.Value); +} diff --git a/src/DynamicData.Tests/Cache/GroupOnPropertyFixture.cs b/src/DynamicData.Tests/Cache/GroupOnPropertyFixture.cs index fe77c1392..89c84e8fd 100644 --- a/src/DynamicData.Tests/Cache/GroupOnPropertyFixture.cs +++ b/src/DynamicData.Tests/Cache/GroupOnPropertyFixture.cs @@ -3,7 +3,6 @@ using DynamicData.Kernel; using DynamicData.Tests.Domain; - using FluentAssertions; using Xunit; @@ -12,7 +11,7 @@ namespace DynamicData.Tests.Cache; public class GroupOnPropertyFixture : IDisposable { - private readonly ChangeSetAggregator, int> _results; + private readonly GroupChangeSetAggregator _results; private readonly SourceCache _source; diff --git a/src/DynamicData.Tests/Domain/Fakers.cs b/src/DynamicData.Tests/Domain/Fakers.cs index 7bf51eeec..f1af3e267 100644 --- a/src/DynamicData.Tests/Domain/Fakers.cs +++ b/src/DynamicData.Tests/Domain/Fakers.cs @@ -1,4 +1,6 @@ -using Bogus; +using System; +using System.Linq; +using Bogus; using DynamicData.Tests.Utilities; namespace DynamicData.Tests.Domain; @@ -30,6 +32,8 @@ internal static class Fakers ["Parakeet", "Cockatoo", "Parrot", "Finch", "Conure", "Lovebird", "Cockatiel"], ]; + private static readonly string[] PersonGenders = ["F", "M", "O"]; + public static Faker Animal { get; } = new Faker() .CustomInstantiator(faker => @@ -47,6 +51,12 @@ internal static class Fakers public static Faker Market { get; } = new Faker().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Id#{faker.Random.AlphaNumeric(5)}")); + public static Faker Person { get; } = new Faker().CustomInstantiator(faker => + new Person(faker.Person.FullName, faker.Random.Int(1, 100), faker.PickRandom(PersonGenders)) + { + FavoriteColor = faker.Random.RandomColor() + }); + public static Faker WithInitialAnimals(this Faker existing, Faker animalFaker, int minCount, int maxCount) => existing.FinishWith((faker, owner) => owner.Animals.AddRange(animalFaker.GenerateBetween(minCount, maxCount))); @@ -61,6 +71,12 @@ public static AnimalOwner AddAnimals(this AnimalOwner owner, Faker anima public static AnimalOwner AddAnimals(this AnimalOwner owner, Faker animalFaker, int count) => owner.With(o => o.Animals.AddRange(animalFaker.Generate(count))); + + public static Color RandomColor(this Randomizer rand) => + rand.EnumValues(1, Color.NotSpecified)[0]; + + public static Color RandomColor(this Randomizer rand, Color current) => + rand.EnumValues(1, Color.NotSpecified, current)[0]; } internal static class FakerExtensions diff --git a/src/DynamicData.Tests/Domain/Person.cs b/src/DynamicData.Tests/Domain/Person.cs index 67a1ed03d..914172475 100644 --- a/src/DynamicData.Tests/Domain/Person.cs +++ b/src/DynamicData.Tests/Domain/Person.cs @@ -5,10 +5,23 @@ namespace DynamicData.Tests.Domain; +public enum Color +{ + NotSpecified, + Red, + Orange, + Yellow, + Green, + Blue, + Indigo, + Violet, +} + public class Person : AbstractNotifyPropertyChanged, IEquatable { private int _age; private int? _ageNullable; + private Color _favoriteColor; public Person() : this("unknown", 0, "none") @@ -36,6 +49,15 @@ public Person(string name, int? age, string gender = "F", string? parentName = n ParentName = parentName ?? string.Empty; } + private Person(string name, int? age, string gender, Person personCopyKey) + { + Name = name; + _ageNullable = age; + Gender = gender; + ParentName = personCopyKey?.ParentName ?? throw new ArgumentNullException(nameof(personCopyKey)); + UniqueKey = personCopyKey.UniqueKey; + } + public static IEqualityComparer AgeComparer { get; } = new AgeEqualityComparer(); public static IEqualityComparer NameAgeGenderComparer { get; } = new NameAgeGenderEqualityComparer(); @@ -52,10 +74,18 @@ public int? AgeNullable set => SetAndRaise(ref _ageNullable, value); } + public Color FavoriteColor + { + get => _favoriteColor; + set => SetAndRaise(ref _favoriteColor, value); + } + public string Gender { get; } public string Key => Name; + public string UniqueKey { get; } = Guid.NewGuid().ToString("B"); + public string Name { get; } public string ParentName { get; } @@ -64,6 +94,12 @@ public int? AgeNullable public static bool operator !=(Person left, Person right) => !Equals(left, right); + public static Person CloneUniqueId(Person sourceData, Person sourceId) => + new((sourceData ?? throw new ArgumentNullException(nameof(sourceData))).Name, sourceData.Age, sourceData.Gender, sourceId) + { + FavoriteColor = sourceData.FavoriteColor + }; + public bool Equals(Person? other) { if (other is null) diff --git a/src/DynamicData/Cache/Internal/DynamicGrouper.cs b/src/DynamicData/Cache/Internal/DynamicGrouper.cs new file mode 100644 index 000000000..79a013fa6 --- /dev/null +++ b/src/DynamicData/Cache/Internal/DynamicGrouper.cs @@ -0,0 +1,226 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Diagnostics; +using DynamicData.Kernel; + +namespace DynamicData.Cache.Internal; + +internal sealed class DynamicGrouper(Func? groupSelector = null) : IDisposable + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull +{ + private readonly ChangeAwareCache, TGroupKey> _groupCache = new(); + private readonly Dictionary _groupKeys = []; + private readonly HashSet> _emptyGroups = []; + private readonly Func? _groupSelector = groupSelector; + + public void AddOrUpdate(TKey key, TGroupKey groupKey, TObject item, IObserver>? observer = null) + { + PerformAddOrUpdate(key, groupKey, item); + + if (observer != null) + { + EmitChanges(observer); + } + } + + public void ProcessChangeSet(IChangeSet changeSet, IObserver>? observer = null) + { + foreach (var change in changeSet.ToConcreteType()) + { + switch (change.Reason) + { + case ChangeReason.Add when _groupSelector is not null: + PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current); + break; + + case ChangeReason.Remove: + PerformRemove(change.Key); + break; + + case ChangeReason.Update when _groupSelector is not null: + PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current); + break; + + case ChangeReason.Update: + PerformUpdate(change.Key); + break; + + case ChangeReason.Refresh when _groupSelector is not null: + PerformRefresh(change.Key, _groupSelector(change.Current, change.Key), change.Current); + break; + + case ChangeReason.Refresh: + PerformRefresh(change.Key); + break; + } + } + + if (observer != null) + { + EmitChanges(observer); + } + } + + public void EmitChanges(IObserver> observer) + { + // Verify logic doesn't capture any non-empty groups + Debug.Assert(_emptyGroups.All(static group => group.Cache.Count == 0), "Non empty Group in Empty Group HashSet"); + + // Dispose/Remove any empty groups + foreach (var group in _emptyGroups) + { + if (group.Count == 0) + { + _groupCache.Remove(group.Key); + group.Dispose(); + } + } + + _emptyGroups.Clear(); + + // Make sure no empty ones were missed + Debug.Assert(!_groupCache.Items.Any(static group => group.Cache.Count == 0), "Not all empty Groups were removed"); + + // Emit any pending changes + var changeSet = _groupCache.CaptureChanges(); + if (changeSet.Count != 0) + { + observer.OnNext(new GroupChangeSet(changeSet)); + } + } + + public void Dispose() => _groupCache.Items.ForEach(group => (group as ManagedGroup)?.Dispose()); + + private static void PerformGroupRefresh(TKey key, in Optional> optionalGroup) + { + if (optionalGroup.HasValue) + { + optionalGroup.Value.Update(updater => updater.Refresh(key)); + } + else + { + Debug.Fail("Should not receive a refresh for an unknown Group Key"); + } + } + + private Optional> LookupGroup(TKey key) => + _groupKeys.Lookup(key).Convert(LookupGroup); + + private Optional> LookupGroup(TGroupKey groupKey) => + _groupCache.Lookup(groupKey).Convert(static grp => (grp as ManagedGroup)!); + + private ManagedGroup GetOrAddGroup(TGroupKey groupKey) => + LookupGroup(groupKey).ValueOr(() => + { + var newGroup = new ManagedGroup(groupKey); + _groupCache.Add(newGroup, groupKey); + return newGroup; + }); + + private void PerformAddOrUpdate(TKey key, TGroupKey groupKey, TObject item) + { + // See if this item already has been grouped + if (_groupKeys.TryGetValue(key, out var currentGroupKey)) + { + // See if the key has changed + if (EqualityComparer.Default.Equals(groupKey, currentGroupKey)) + { + // GroupKey did not change, so just update the value in the group + var optionalGroup = LookupGroup(currentGroupKey); + if (optionalGroup.HasValue) + { + optionalGroup.Value.Update(updater => updater.AddOrUpdate(item, key)); + return; + } + + Debug.Fail("If there is a GroupKey associated with a Key, the Group for that GroupKey should exist."); + } + else + { + // GroupKey changed, so remove from old and allow to be added below + PerformRemove(key, currentGroupKey); + } + } + + // Find the right group and add the item + PerformGroupAddOrUpdate(key, groupKey, item); + } + + private void PerformGroupAddOrUpdate(TKey key, TGroupKey groupKey, TObject item) + { + var group = GetOrAddGroup(groupKey); + group.Update(updater => updater.AddOrUpdate(item, key)); + _groupKeys[key] = groupKey; + + // Can't be empty since a value was just added + _emptyGroups.Remove(group); + } + + private void PerformRefresh(TKey key) => PerformGroupRefresh(key, LookupGroup(key)); + + // When the GroupKey is available, check then and move the group if it changed + private void PerformRefresh(TKey key, TGroupKey newGroupKey, TObject item) + { + if (_groupKeys.TryGetValue(key, out var groupKey)) + { + // See if the key has changed + if (EqualityComparer.Default.Equals(newGroupKey, groupKey)) + { + // GroupKey did not change, so just refresh the value in the group + PerformGroupRefresh(key, LookupGroup(groupKey)); + } + else + { + // GroupKey changed, so remove from old and add to new + PerformRemove(key, groupKey); + PerformGroupAddOrUpdate(key, newGroupKey, item); + } + } + else + { + Debug.Fail("Should not receive a refresh for an unknown key"); + } + } + + private void PerformRemove(TKey key) + { + if (_groupKeys.TryGetValue(key, out var groupKey)) + { + PerformRemove(key, groupKey); + _groupKeys.Remove(key); + } + else + { + Debug.Fail("Should not receive a Remove Event for an unknown key"); + } + } + + private void PerformRemove(TKey key, TGroupKey groupKey) + { + var optionalGroup = LookupGroup(groupKey); + if (optionalGroup.HasValue) + { + var currentGroup = optionalGroup.Value; + currentGroup.Update(updater => + { + updater.Remove(key); + if (updater.Count == 0) + { + _emptyGroups.Add(currentGroup); + } + }); + } + else + { + Debug.Fail("Should not receive a Remove Event for an unknown Group Key"); + } + } + + // Without the new group key, all that can be done is remove the old value + // Consumer of the Grouper is resonsible for Adding the New Value. + private void PerformUpdate(TKey key) => PerformRemove(key); +} diff --git a/src/DynamicData/Cache/Internal/GroupOnObservable.cs b/src/DynamicData/Cache/Internal/GroupOnObservable.cs new file mode 100644 index 000000000..88967d67a --- /dev/null +++ b/src/DynamicData/Cache/Internal/GroupOnObservable.cs @@ -0,0 +1,60 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Disposables; +using System.Reactive.Linq; +using DynamicData.Internal; + +namespace DynamicData.Cache.Internal; + +internal sealed class GroupOnObservable(IObservable> source, Func> selectGroup) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull +{ + public IObservable> Run() => Observable.Create>(observer => + { + var grouper = new DynamicGrouper(); + var locker = new object(); + var parentUpdate = false; + + IObservable CreateGroupObservable(TObject item, TKey key) => + selectGroup(item, key) + .DistinctUntilChanged() + .Synchronize(locker!) + .Do( + onNext: groupKey => grouper!.AddOrUpdate(key, groupKey, item, !parentUpdate ? observer : null), + onError: observer.OnError); + + // Create a shared connection to the source + var shared = source + .Synchronize(locker) + .Do(_ => parentUpdate = true) + .Publish(); + + // First process the changesets + var subChanges = shared + .SubscribeSafe( + onNext: changeSet => grouper.ProcessChangeSet(changeSet), + onError: observer.OnError); + + // Next process the Grouping observables created for each item + var subMergeMany = shared + .MergeMany(CreateGroupObservable) + .SubscribeSafe(onError: observer.OnError); + + // Finally, emit the results + var subResults = shared + .SubscribeSafe( + onNext: _ => + { + grouper.EmitChanges(observer); + parentUpdate = false; + }, + onError: observer.OnError, + onComplete: observer.OnCompleted); + + return new CompositeDisposable(shared.Connect(), subMergeMany, subChanges, grouper); + }); +} diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index a31448745..d3516b89f 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -1973,6 +1973,45 @@ public static IObservable> Group(source, groupSelectorKey, regrouper).Run(); } + /// + /// Groups the source by the latest value from their observable created by the given factory. + /// + /// The type of the object. + /// The type of the key. + /// The type of the group key. + /// The source. + /// The group selector key. + /// An observable which will emit group change sets. + public static IObservable> GroupOnObservable(this IObservable> source, Func> groupObservableSelector) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + groupObservableSelector.ThrowArgumentNullExceptionIfNull(nameof(groupObservableSelector)); + + return new GroupOnObservable(source, groupObservableSelector).Run(); + } + + /// + /// Groups the source by the latest value from their observable created by the given factory. + /// + /// The type of the object. + /// The type of the key. + /// The type of the group key. + /// The source. + /// The group selector key. + /// An observable which will emit group change sets. + public static IObservable> GroupOnObservable(this IObservable> source, Func> groupObservableSelector) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull + { + groupObservableSelector.ThrowArgumentNullExceptionIfNull(nameof(groupObservableSelector)); + + return source.GroupOnObservable((obj, _) => groupObservableSelector(obj)); + } + /// /// Groups the source using the property specified by the property selector. Groups are re-applied when the property value changed. /// When there are likely to be a large number of group property changes specify a throttle to improve performance. diff --git a/src/DynamicData/Cache/Tests/GroupChangeSetAggregator.cs b/src/DynamicData/Cache/Tests/GroupChangeSetAggregator.cs new file mode 100644 index 000000000..87037c4eb --- /dev/null +++ b/src/DynamicData/Cache/Tests/GroupChangeSetAggregator.cs @@ -0,0 +1,105 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Disposables; +using System.Reactive.Linq; + +using DynamicData.Diagnostics; + +namespace DynamicData.Tests; + +/// +/// Aggregates all events and statistics for a group change set to help assertions when testing. +/// +/// The type of the object. +/// The type of the key. +/// The type of the grouping key. +public class GroupChangeSetAggregator : IDisposable + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull +{ + private readonly CompositeDisposable _compositeDisposable; + private readonly List> _messages = []; + private bool _disposedValue; + + /// + /// Initializes a new instance of the class. + /// + /// The source. + public GroupChangeSetAggregator(IObservable> source) + { + var published = source.Publish(); + + Data = published.AsObservableCache(); + Groups = published.Transform(grp => grp.Cache.Connect().AsAggregator()).DisposeMany().AsObservableCache(); + var results = published.Subscribe(_messages.Add, ex => Error = ex, () => IsCompleted = true); + var summariser = published.CollectUpdateStats().Subscribe(summary => Summary = summary, static _ => { }); + + _compositeDisposable = new(Data, Groups, results, summariser, published.Connect()); + } + + /// + /// Gets the data. + /// + /// + /// The data. + /// + public IObservableCache, TGroupKey> Data { get; } + + /// Gets a cache containing the aggregated results of each individual group. + public IObservableCache, TGroupKey> Groups { get; } + + /// + /// Gets the error. + /// + /// + /// The error. + /// + public Exception? Error { get; private set; } + + /// + /// Gets a value indicating whether or not the ChangeSet fired OnCompleted. + /// + /// + /// Boolean Value. + /// + public bool IsCompleted { get; private set; } + + /// + /// Gets a list of the messages that were received. + /// + public IReadOnlyList> Messages => _messages; + + /// + /// Gets the summary. + /// + /// + /// The summary. + /// + public ChangeSummary Summary { get; private set; } = ChangeSummary.Empty; + + /// + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + /// + /// Disposes of managed and unmanaged responses. + /// + /// If being called by the Dispose method. + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + _ = disposing; + _compositeDisposable.Dispose(); + _disposedValue = true; + using var cleanup = Groups.Connect().DisposeMany().Subscribe(); + } + } +} diff --git a/src/DynamicData/Cache/Tests/TestEx.cs b/src/DynamicData/Cache/Tests/TestEx.cs index b3e73db1e..e3581b14d 100644 --- a/src/DynamicData/Cache/Tests/TestEx.cs +++ b/src/DynamicData/Cache/Tests/TestEx.cs @@ -36,6 +36,25 @@ public static DistinctChangeSetAggregator AsAggregator(this IObs return new DistinctChangeSetAggregator(source); } + /// + /// Aggregates all events and statistics for a group change set to help assertions when testing. + /// + /// The type of the value. + /// The type of the key. + /// The type of the grouping key. + /// The source. + /// The distinct change set aggregator. + /// source. + public static GroupChangeSetAggregator AsAggregator(this IObservable> source) + where TValue : notnull + where TKey : notnull + where TGroupKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + + return new GroupChangeSetAggregator(source); + } + /// /// Aggregates all events and statistics for a sorted change set to help assertions when testing. ///