Избегание перекрывающихся вызовов OnNext в Rx при использовании SubscribeOn (Scheduler.TaskPool) - PullRequest
11 голосов
/ 06 февраля 2012

У меня есть код, использующий Rx, который вызывается из нескольких потоков:

subject.OnNext(value); // where subject is Subject<T>

Я хочу, чтобы значения обрабатывались в фоновом режиме, поэтому моя подписка

subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value =>
{
    // use value
});

Меня не волнует, какие потоки обрабатывают значения, выходящие из Observable, если работа помещается в TaskPool и не блокирует текущий поток. Тем не менее, мое использование 'value' внутри моего делегата OnNext не является потокобезопасным. В настоящий момент, если через Observable проходит много значений, я получаю перекрывающиеся вызовы моего обработчика OnNext.

Я мог бы просто добавить блокировку для моего делегата OnNext, но это не похоже на способ работы Rx. Как лучше всего убедиться, что у меня есть только один вызов для моего обработчика OnNext за раз, когда у меня несколько потоков, вызывающих subject.OnNext(value);?

Ответы [ 4 ]

12 голосов
/ 15 августа 2012

С Использование тем на MSDN

По умолчанию субъекты не выполняют никакой синхронизации через потоки. [...] Однако, если вы хотите синхронизировать исходящие звонки с наблюдателями с помощью планировщика, вы можете использовать метод синхронизации для этого.

Таким образом, вы должны, как говорит Брэндон в комментариях, синхронизировать тему и передать ее своим продюсерам. например,

var syncSubject = Subject.Synchronize(subject);

// syncSubject.OnNext(value) can be used from multiple threads

subscription = syncSubject.ObserveOn(TaskPoolScheduler.Default).Subscribe(value =>
{
    // use value
});
6 голосов
/ 06 февраля 2012

Я думаю, что вы ищете метод расширения .Synchronize (). Чтобы улучшить производительность в последнем выпуске (конец 2011 г.), команда Rx ослабила предположения о последовательной природе наблюдаемых производителей последовательностей. Однако кажется, что вы нарушаете эти предположения (что не так уж и плохо), но чтобы возобновить воспроизведение Rx так, как ожидают пользователи, вам следует синхронизировать последовательность, чтобы убедиться, что она снова последовательная.

1 голос
/ 09 февраля 2012

Здесь немного больше объяснений, почему использовать Синхронизировать (второй абзац).С другой стороны, Synchronize может принять участие в тупике, если вы активно используете блокировку в своем коде, по крайней мере, я был свидетелем такой ситуации.

0 голосов
/ 06 февраля 2012

Вы можете попытаться реализовать свой собственный IObserver, который будет применять блокировку в своем методе OnNext.Просто простой декоратор: примените блокировку, вызовите внутренний OnNext, снимите блокировку.

Затем вы можете реализовать метод расширения в IObservable, что-то вроде .AsThreadSafe ().

...