Существует довольно простой способ сделать то, что вы хотите, с помощью Rx, но вам нужно думать только с точки зрения наблюдаемых и не смешивать с перечисляемыми.
Сигнатура функции, которую вам действительно нужно мыслить в терминахof is:
IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>
Вот функция:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();
Важно, чтобы каждый IObservable<ChannelState>
в IObservable<IObservable<ChannelState>>
вел себя правильно, чтобы это работало.
Я предполагал, что перечисление ChannelState
имеет состояние Idle
и что каждый IObservable<ChannelState>
будет выдавать ноль или более пар значений Operational
/ Idle
(Operational
, за которыми следует Idle
) до завершения.
Также вы сказали, что «набор каналов может быть добавлен и удален из» - думая с точки зрения IEnumerable<IObservable<ChannelState>>
, это звучит разумно - но в Rx вам не нужно беспокоиться об удалениях, потому что каждый наблюдаемыйможет сигнализировать о своем завершении.Как только он сигнализирует о завершении, он как будто был удален из коллекции, потому что не может генерировать дальнейшие значения.Так что вам нужно беспокоиться только о добавлении в коллекцию - это легко с использованием предметов.
Так что теперь функцию можно использовать так:
var channelStatesSubject = new Subject<IObservable<ChannelState>>();
var channelStates = channelStatesSubject.AsObservable();
var channelSetStates = f(channelStates);
channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ });
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
// etc
Я запустил это, используя некоторый тестовый код, который использовал три случайных ChannelState
наблюдаемых с вызовом Do
в функции f
для отладки и получил следующую последовательность:
1
Up
2
3
2
1
2
1
0
Down
1
Up
0
Down
Я думаю, это то, что вам нужно.Дайте мне знать, если я что-то пропустил.
Согласно комментариям ниже, перечисление ChannelState
имеет несколько состояний, но только Operational
означает, что соединение установлено.Так что очень просто добавить оператор DistinctUntilChanged
, чтобы скрыть несколько состояний "вниз".Вот код теперь:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.DistinctUntilChanged()
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();
Добавлен код, чтобы первый запрос на выборку всегда начинался с 1
.Вот код сейчас:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.StartWith(1)
.DistinctUntilChanged()
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();