Replies: 2 comments 1 reply
-
I'm assuming your initialization logic is asynchronous, otherwise, I really don't understand where the problem is that needs solved. That is, if initialization is synchronous, then do it synchronously, and then synchronously setup your subscription afterwards. The same goes for the problem of combining multiple sources, since adding items is synchronous, regardless of how many sources you have. On the assumption that initialization is asynchronous, then, I think this would be my approach: using var itemsSource = new SourceCache<Item, Guid>(static item => item.Id);
using var subscription = Observable.CombineLatest(
first: itemsSource.CountChanged
.StartWith(itemsSource.Count),
second: itemsSource
.Connect()
.AutoRefresh(static item => item.HasInitialized)
.Filter(static item => item.HasInitialized)
.ForAggregation()
.Count(),
resultSelector: static (itemCount, initializedCount) => (itemCount is not 0) && (itemCount == initializedCount))
.DistinctUntilChanged()
.Where(static hasFullyInitialized => hasFullyInitialized)
.Take(1)
.Select(_ => itemsSource
.Connect()
.AutoRefresh()
.ObserveOn(RxApp.MainThreadScheduler)
.ToCollection())
.Subscribe(items =>
{
// Do expensive stuff
});
itemsSource.AddOrUpdate(
[
new Item(),
new Item(),
new Item()
]);
var delay = TimeSpan.FromSeconds(1);
foreach (var item in itemsSource.Items)
{
await item.InitializeAsync(delay);
delay += TimeSpan.FromSeconds(1);
} public class Item
: INotifyPropertyChanged
{
public Item()
=> _id = Guid.NewGuid();
public Guid Id
=> _id;
public bool HasInitialized
{
get => _hasInitialized;
private set
{
if (value != _hasInitialized)
return;
_hasInitialized = value;
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(HasInitialized)));
}
}
public event PropertyChangedEventHandler? PropertyChanged;
public async Task InitializeAsync(TimeSpan delay)
{
await Task.Delay(delay);
_hasInitialized = true;
}
private bool _hasInitialized;
private Guid _id;
} |
Beta Was this translation helpful? Give feedback.
-
I'm not actually sure how that compiles in your example. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Let's say I have several dynamic data collections, each containing possibly a large number of items. These items are in separate collections for ease of lookup (e.g. if I know what "kind" of item I want, I have to look through fewer items like this). However, some crucial processing must be done on all of the items. For this purpose, I merge all my dynamic data collections into one observable changeset (on which I apply several rx and dynamic data operators etc.). The final action that is performed when I
Subscribe()
at the end is potentially expensive, but when the app is running normally it needs to happen in response to a variety of changes. However, during initialization, I know that many of these changes will happen all at once or in rapid succession. I would like to avoid this expensive operation getting trigger a million times. I want it to occur only one time after all data is initialized, and after that it should happen every time the relevant changes occur.The obvious solution is to subscribe only after initialization is complete. My issue is finding out WHEN the initialization is complete. To illustrate, here is some code:
My issue happens in the place where
IsEverythingInitialized
is set to true. It seems that that piece of code can be reached before_myChangeSet
is done receiving all the transformed items. This means that a situation can happen (usually when there are thousands of items) when theWhere(items => items.All(x => x.IsInitialized))
succeeds butitems
does not contain all items yet, and some of the missing items are not initialized yet. This means that my final subscription happens "too early" and the handler on it will get triggered a few hundred times as the remaining items go through their initialization.I know that due to the "callback-like" nature of rx in general, this is not an easy problem to solve. Since
_myChangeSet
is a stream of changes, which I want to keep "open" because I want to use it to listen to future changes on the individual items, it's not easy to recognize when the "initial adding of items" is done.In my case, I actually know the number of all the items once the source collections are filled. I could use that to determine if all items are already in
_myChangeSet
like this:But this feels a bit like a hack. Is there some better way how to do this or am I going about this the wrong way in general?
Beta Was this translation helpful? Give feedback.
All reactions