Как я могу сделать свою реализацию IObservable <T>многопоточным? - PullRequest
2 голосов
/ 28 января 2011

Я написал реализацию, основанную на примерах в [Rx DEVHOL202] и http: //rxwiki.wikidot.com/101samples#toc48

Вот мой код.http://csharp.pastebin.com/pm2NAPx6

  1. Это работает, но вызовы OnNext не являются неблокирующими, что я хотел бы реализовать, чтобы симулировать чтение по сети и асинхронную передачу каждого куска байтов в качествечитается обработчику [который не показан здесь полностью, но может кэшировать результаты и выполнять дальнейшую обработку].

    Какой хороший способ сделать это?

  2. Как только исключение выдается, все последующие OnNext () не обрабатываются !!Если я не буду явно выходить из цикла и указывать завершение.Почему это так?

Ответы [ 2 ]

2 голосов
/ 28 января 2011

Я бы настоятельно рекомендовал против , пытаясь реализовать свой собственный IObservable. Неявные правила выходят за рамки безопасности потоков и упорядочивают вызовы методов.

Обычно вы возвращаете IObservable экземпляров из методов, но если вам нужен класс, который реализует его напрямую, вы должны обернуть Subject:

public class SomeObservable<T> : IObservable<T>
{
        private Subject<T> subject = new Subject<T>();

        public IDisposable Subscribe(IObserver<T> observer)
        {
            return subject.Subscribe(observer);
        }
}

1. Вы должны быть осторожны с тем, как вы поддерживаете это от своего наблюдателя (поскольку у вас могут быть общие данные), но вы можете сделать обработку вызовов асинхронной одним из двух способов:

  • Позвоните ObserveOn(Scheduler.TaskPool) (или ThreadPool, если вы не достигли 4.0), прежде чем позвонить Subscribe. Это приводит к тому, что сообщения маршрутизируются через планировщик (в данном случае, задачу)
  • Передайте IScheduler конструктору Observer
  • Запуск асинхронной задачи / потока от вашего подписчика

2. Это ожидаемая функциональность. Контракт между IObservable и IObserver составляет (OnNext)* (OnCompleted | OnError)?, то есть «ноль или более вызовов OnNext, за которыми, возможно, следует EITHER OnCompleted или OnError». После OnCompleted|OnError вызов OnNext недопустим.

Все операторы в Rx (Where, Select и т. Д.) Применяют это правило, даже если источник этого не делает.

0 голосов
/ 28 января 2011

Я не уверен, правильно ли я понимаю ваш вопрос, но почему вы не можете просто выполнить ту логику, которая у вас есть, на другом Thread, или если она достаточно мала, нажмите на ThreadPool?

Вот пример:

ThreadPool.QueueUserWorkItem(o=>
{
    _paidSubj.OnNext(this); // Raise PAID event 
});

Я запутался в типе данных на Subject, я никогда не видел этот класс в C # ... это то, что вы создали?OnNext - это событие, которое вызывается, или это просто метод?Если OnNext является событием, то вы можете использовать BeginInvoke для его асинхронного вызова:

_paidSubj.OnNext.BeginInvoke(this, null, null);

Обновление:

Важная вещь, которая произойдет, еслиВы реализуете этот тип асинхронного поведения: если вы уведомите IObserver, передав Order, у вас могут возникнуть некоторые несоответствия при попытке прочитать данные в наблюдателе (а именно, в буфере заказа), в то время как Порядок продолжает изменятьсябуфер в его потоке Read.Таким образом, есть как минимум два способа решить эту проблему:

  1. Ограничить доступ к памяти, которая будет изменена с помощью блокировок.
  2. Только уведомлять наблюдателя с соответствующей информацией, которую вы хотитеэто увидеть:
    а.Передавая информацию как значение (не как ссылку).
    b.Создавая неизменную структуру, которая передает информацию.

PS Откуда вы взяли Subject?Subject должен быть OrderObserver?

...