Подсчитать количество одновременных потоков в реальном времени - PullRequest
3 голосов
/ 19 июня 2020

У меня есть поток потоков. Я хотел бы знать, чтобы преобразовать это в живое подсчет многих внутренних потоков, которые в настоящее время работают, т.е. не завершены или не содержат ошибок.

Как мне реализовать CountLiveStreams?

var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();

Спасибо!

Ответы [ 2 ]

5 голосов
/ 19 июня 2020

Вот реализация оператора CountLiveStreams:

public static IObservable<int> CountLiveStreams<T>(
    this IObservable<IObservable<T>> streamOfStreams)
{
    return streamOfStreams
        .Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
            .Prepend(1).Append(-1))
        .Merge()
        .Scan(0, (accumulator, delta) => accumulator + delta)
        .Prepend(0);
}

Каждый переданный поток преобразуется в IObservable<int>, который выдает 2 значения, значение 1 в начале и -1 в конце . Затем все пары значений, сгенерированные из всех потоков, объединяются в один IObservable<int>, и, наконец, все эти числа суммируются с помощью оператора Scan.

2 голосов
/ 21 июня 2020

Это работает для меня:

public static IObservable<int> CountLiveStreams<T>(this IObservable<IObservable<T>> source) =>
    source
        .SelectMany(xs =>
            xs
                .Materialize()
                .Where(x => x.Kind != NotificationKind.OnNext)
                .Select(x => -1)
                .StartWith(1))
        .Scan((x, y) => x + y);

Он использует ту же стратегию, что и ответ Теодора: создание 1 и -1, а затем использование .Scan для создания текущего счета, но я думаю, что это яснее звонком на .Materialize().

...