Как динамически объединить n наблюдаемых в список? - PullRequest
6 голосов
/ 25 июля 2011

У меня есть коллекция наблюдаемых, которые генерируют изменения состояния для так называемого Channel. И у меня есть ChannelSet, который должен контролировать эти каналы.

Я хотел бы написать что-то вроде этого: если один канал работает, набор каналов работает, в противном случае набор каналов не работает.

IEnumerable<ChannelState> channelStates = ...;
if (channelStates.Any(cs => cs == ChannelState.Operational))
    channelSet.ChannelSetState = ChannelSetState.Up;
else
    channelSet.ChannelSetState = ChannelSetState.Down;

Но где мне взять мой IEnumerable<ChannelState>? Если у меня есть 1 канал, я могу просто подписаться на изменения его состояния и соответственно изменить состояние набора каналов. Для двух каналов я мог бы использовать CombineLatest:

Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) =>
    {
        if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up)
            return ChannelSetState.Up;
        else
            return ChannelSetState.Down;
    });

Но у меня есть IEnumerable<Channel> и соответствующий IEnumerable<IObservable<ChannelState>>. Я ищу что-то вроде CombineLatest, которое не ограничено фиксированным количеством наблюдаемых.

Чтобы усложнить ситуацию, можно добавить и удалить коллекцию каналов. Так что время от времени, например, будет добавлен канал. Новый канал также генерирует изменения состояния, которые необходимо включить.

Итак, что я на самом деле ищу, так это функцию:

IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

, который обновляется при изменении ввода. Должен быть какой-то способ сделать это с помощью Rx, но я не могу понять, как это сделать.

Ответы [ 3 ]

3 голосов
/ 26 июля 2011

Существует довольно простой способ сделать то, что вы хотите, с помощью Rx, но вам нужно думать только с точки зрения наблюдаемых и не смешивать с перечисляемыми.

Сигнатура функции, которую вам действительно нужно мыслить в терминахof is:

IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

Вот функция:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();

Важно, чтобы каждый IObservable<ChannelState> в IObservable<IObservable<ChannelState>> вел себя правильно, чтобы это работало.

Я предполагал, что перечисление ChannelState имеет состояние Idle и что каждый IObservable<ChannelState> будет выдавать ноль или более пар значений Operational / Idle (Operational, за которыми следует Idle) до завершения.

Также вы сказали, что «набор каналов может быть добавлен и удален из» - думая с точки зрения IEnumerable<IObservable<ChannelState>>, это звучит разумно - но в Rx вам не нужно беспокоиться об удалениях, потому что каждый наблюдаемыйможет сигнализировать о своем завершении.Как только он сигнализирует о завершении, он как будто был удален из коллекции, потому что не может генерировать дальнейшие значения.Так что вам нужно беспокоиться только о добавлении в коллекцию - это легко с использованием предметов.

Так что теперь функцию можно использовать так:

var channelStatesSubject = new Subject<IObservable<ChannelState>>();
var channelStates = channelStatesSubject.AsObservable();
var channelSetStates = f(channelStates);

channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ });

channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
// etc

Я запустил это, используя некоторый тестовый код, который использовал три случайных ChannelState наблюдаемых с вызовом Do в функции f для отладки и получил следующую последовательность:

1
Up
2
3
2
1
2
1
0
Down
1
Up
0
Down

Я думаю, это то, что вам нужно.Дайте мне знать, если я что-то пропустил.


Согласно комментариям ниже, перечисление ChannelState имеет несколько состояний, но только Operational означает, что соединение установлено.Так что очень просто добавить оператор DistinctUntilChanged, чтобы скрыть несколько состояний "вниз".Вот код теперь:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();

Добавлен код, чтобы первый запрос на выборку всегда начинался с 1.Вот код сейчас:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .StartWith(1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();
1 голос
/ 26 июля 2011

Я обещал добавить решение, которое придумал сам, так что вот оно. Пока я не нашел ничего лучшего, я буду использовать это, хотя я все еще думаю, что должен быть лучший способ:)

Я использую класс, который использует ConcurrentDictionary, чтобы сохранить последнее значение из каждой зарегистрированной наблюдаемой. Когда наблюдаемая становится незарегистрированной, ее последнее значение снова удаляется, а также подписка, связанная с ней.

Когда какое-либо зарегистрированное наблюдаемое генерирует значение, все последние значения собираются и отправляются в Subject.

public class DynamicCombineLatest<T>
{
    private readonly IDictionary<IObservable<T>, T> _latestValues =
        new ConcurrentDictionary<IObservable<T>, T>();
    private readonly IDictionary<IObservable<T>, IDisposable> _subscriptions =
        new ConcurrentDictionary<IObservable<T>, IDisposable>();
    private readonly ISubject<IEnumerable<T>> _result =
        new Subject<IEnumerable<T>>();

    public void AddObservable(IObservable<T> observable)
    {
        var subscription =
            observable.Subscribe(t =>
                                 {
                                     _latestValues[observable] = t;
                                     _result.OnNext(_latestValues.Values);
                                 });
        _subscriptions[observable] = subscription;
    }

    public void RemoveObservable(IObservable<T> observable)
    {
        _subscriptions[observable].Dispose();
        _latestValues.Remove(observable);
        _subscriptions.Remove(observable);
    }

    public IObservable<IEnumerable<T>> Result
    {
        get { return _result; }
    }
}
1 голос
/ 26 июля 2011

Возможно начать с IObservable<Channel>, а не с w / IEnumerable<Channel>.Для этого можно использовать Subject<Channel>, а при создании нового OnNext() it.

Если вам нужен список,

xsChannels.Subscribe (item => {lock (list) {list.add (item);}});

...