Включение IObservable > - PullRequest
1 голос
/ 27 марта 2020

У меня есть следующая структура:

// source of data
interface IItem
{
    IObservable<string> Changed { get; }
}

interface IItemCollection
{
    List<IItem> Items { get; }
    IObservable<IItem> ItemAdded { get; }
    IObservable<IItem> ItemRemoved { get; }
}

interface IItemCollectionManager
{
    List<IItemCollection> ItemCollectionCollection { get; }
    IObservable<IItemCollection> ItemCollectionAdded { get; }
    IObservable<IItemCollection> ItemCollectionRemoved { get; }
}

// desired result
interface IAggregation
{
    IObservable<string> Changed { get; }
}

Цель здесь состоит в том, чтобы IAggregation раскрыл одну наблюдаемую. Однако IItem s можно добавлять и удалять из каждого IItemCollection в любое время, и, действительно, IItemCollection можно добавлять или удалять из IItemCollectionManager в любое время. Конечно, когда добавляется такой IItemCollection, Aggregation должен также выдавать значения из этого, и если ItemCollection удаляется, я больше не хочу string s из IItem s в этой коллекции быть выпущенным. Кроме того, когда Item добавляется к любому IItemCollection, значения из его Changed наблюдаемой должны также создавать значения из IAggregation Changed наблюдаемой.

Теперь это было довольно просто решить эту проблему, когда был только один IItemCollection, например, вот так:

class AggregationImpl : IAggregation 
{
    public AggregationImpl(IItemCollection itemCollection)
    {
        var added = itemCollection.ItemAdded
            .Select(_ => itemCollection.Items);
        var removed = itemCollection.ItemRemoved
            .Select(_ => itemCollection.Items);

        Changed = Observable.Merge(added, removed)
            .StartWith(itemCollection.Items)
            .Select(coll => coll.Select(item => item.Changed).Merge())
            .Switch();
    }

    public IObservable<string> Changed { get; }

}

... ключевой момент здесь заключается в том, что я сплющил все наблюдаемые Item Changed в один наблюдаемый с Merge(), а затем, каждый раз, когда элемент добавляется или удаляется, я воссоздаю весь Observable и использую Switch(), чтобы отписаться от старого и подписаться на новый`.

I Мне кажется, что расширение с включением IItemCollectionManager должно быть довольно простым, но я не совсем уверен, как к нему подойти.

1 Ответ

3 голосов
/ 29 марта 2020

Надеюсь, это сработает или, по крайней мере, установит правильный путь. Поскольку тестирование кажется довольно сложным, я собираюсь сделать это. Если у вас есть простой тестовый код, я буду рад протестировать.

Во-первых, мне не очень нравится ваша опубликованная реализация. Вы подключаетесь к наблюдаемым объектам ItemAdded и ItemRemoved, вообще не используя данные; вы получаете данные из свойства Items. Это может привести к состоянию гонки в плохой реализации, когда событие отправляется до обновления свойства. Поэтому я создал свою собственную реализацию. Я также рекомендую добавить его в метод расширения, потому что это облегчит жизнь позже:

public static IObservable<string> ToAggregatedObservable(this IItemCollection itemCollection)
{
    return Observable.Merge(
            itemCollection.ItemAdded.Select(item => (op: "+", item)),
            itemCollection.ItemRemoved.Select(item => (op: "-", item))
        )
        .Scan(ImmutableList<IItem>.Empty.AddRange(itemCollection.Items), (list, t) =>
            t.op == "+"
                ? list.Add(t.item)
                : list.Remove(t.item)
        )
        .Select(l => l.Select(item => item.Changed).Merge())
        .Switch();

}

Простите магическую строку c за секунду, вы можете превратить ее в enum, если хотите. Мы сохраняем состояние текущих элементов в ImmutableList внутри Scan. Когда элемент добавляется / удаляется, мы обновляем список, а затем переключаем наблюдаемое.

Этот же лог c можно применить к уровню менеджера коллекции:

public static IObservable<string> ToAggregatedObservable(this IItemCollectionManager itemCollectionManager)
{
    return Observable.Merge(
            itemCollectionManager.ItemCollectionAdded.Select(itemColl => (op: "+", itemColl)),
            itemCollectionManager.ItemCollectionRemoved.Select(itemColl => (op: "-", itemColl))
        )
        .Scan(ImmutableList<IItemCollection>.Empty.AddRange(itemCollectionManager.ItemCollectionCollection), (list, t) =>
            t.op == "+"
                ? list.Add(t.itemColl)
                : list.Remove(t.itemColl)
        )
        .Select(l => l.Select(itemColl => itemColl.ToAggregatedObservable()).Merge())
        .Switch();
}

Здесь мы Вы просто повторно используете первый метод расширения, используя то же самое добавление / удаление, затем переключаете logi c, как и раньше.

...