У меня есть код, использующий Rx, который вызывается из нескольких потоков:
subject.OnNext(value); // where subject is Subject<T>
Я хочу, чтобы значения обрабатывались в фоновом режиме, поэтому моя подписка
subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value =>
{
// use value
});
Меня не волнует, какие потоки обрабатывают значения, выходящие из Observable, если работа помещается в TaskPool и не блокирует текущий поток. Тем не менее, мое использование 'value' внутри моего делегата OnNext не является потокобезопасным. В настоящий момент, если через Observable проходит много значений, я получаю перекрывающиеся вызовы моего обработчика OnNext.
Я мог бы просто добавить блокировку для моего делегата OnNext, но это не похоже на способ работы Rx. Как лучше всего убедиться, что у меня есть только один вызов для моего обработчика OnNext за раз, когда у меня несколько потоков, вызывающих subject.OnNext(value);
?