Как регулировать бесконечное количество наблюдаемых, чтобы поддерживать постоянное количество элементов в процессе? - PullRequest
0 голосов
/ 08 марта 2019

У меня есть бесконечная наблюдаемая, сгенерированная из периодической выборки перечисляемого:

    private static IObservable<Runjob> GetRunjobObservable(IEnumerable<Runjob> runjobs)
    {
        var observable = Observable.Create<Runjob>(obs =>
        {
            var timer = new System.Timers.Timer();
            timer.Interval = _interval;
            timer.Elapsed += (s, e) => obs.OnNext(runjobs.Sample(1, true).Single());
            return timer;
        });
        return observable;
    }

Я хотел бы подвергнуть этот поток рабочих заданий обработке в разных потоках, но с условием, что не более 10 рабочих заданийобрабатываются в любой момент времени.Не важно, чтобы обрабатывался каждый runjob в наблюдаемом - моя цель - запустить несколько процессов одновременно и на постоянной основе в целях стресс-тестирования.

У меня есть следующий код.Как мне изменить его так, чтобы в каждый момент времени обрабатывалось не более 10 рабочих заданий?

 var runjobObs = GetRunjobObservable(runjobs);
 var replies = runjobObs
            .SubscribeOn(new EventLoopScheduler(t => new Thread(t) {Name = _threadCounter++.ToString(), IsBackground = true}))
            .ObserveOn(Dispatcher.CurrentDispatcher)
            .Select(HandleRunjob);

Мой Rx немного ржавый, поэтому, если вы видите что-то не так с моим примером, пожалуйста, исправьте меня.

...