Вот реализация оператора CountLiveStreams
:
public static IObservable<int> CountLiveStreams<T>(
this IObservable<IObservable<T>> streamOfStreams)
{
return streamOfStreams
.Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
.Prepend(1).Append(-1))
.Merge()
.Scan(0, (accumulator, delta) => accumulator + delta)
.Prepend(0);
}
Каждый переданный поток преобразуется в IObservable<int>
, который выдает 2 значения, значение 1 в начале и -1 в конце . Затем все пары значений, сгенерированные из всех потоков, объединяются в один IObservable<int>
, и, наконец, все эти числа суммируются с помощью оператора Scan
.