Как я могу наблюдать значения неблокирующим способом, используя Rx? - PullRequest
4 голосов
/ 17 июня 2011

Я пытаюсь наблюдать по таймеру, чей обработчик длиннее интервала.для этого я хочу запланировать наблюдение на каком-либо threadPool, пуле задач или чем-то подобном.

Я пробовал threadpool, taskpool и newthread, и ни один из них не работал.Кто-нибудь знает как это сделать?пример:

var disposable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100)).ObserveOn(Scheduler.NewThread).
    Subscribe(x =>
      {
      count++;
    Thread.Sleep(TimeSpan.FromMilliseconds(1000));
  });

  Thread.Sleep(TimeSpan.FromSeconds(5));
  disposable.Dispose();
  if (count > 10 )
  {
    //hurray...
  }

Ответы [ 3 ]

4 голосов
/ 17 июня 2011

То, что вы спрашиваете, является плохой идеей, потому что вы со временем исчерпаете доступные ресурсы (начиная с скорости создания потоков> скорости завершения потока).Вместо этого, почему бы вам не запланировать новый элемент, когда предыдущий будет закончен?

В вашем конкретном примере вам нужно передать IScheduler в Observable.Timer вместо того, чтобы пытаться использовать ObserveOn.

2 голосов
/ 24 июня 2011

Павел прав, когда говорит, что это плохая идея. Вы логически создаете ситуацию, когда действия в очереди могут привести к потере системных ресурсов. Вы даже можете обнаружить, что он работает на вашем компьютере, но не работает на компьютере клиента. Доступная память, 32- / 64-битный процессор и т. Д. Могут повлиять на код.

Однако ваш код легко изменить, чтобы он делал то, что вы хотите.

Сначала, однако, метод Timer будет правильно планировать события таймера, пока наблюдатель заканчивает работу до следующего запланированного события. Если наблюдатель еще не закончил, таймер будет ждать. Помните, что наблюдаемые таймеры являются «холодными» наблюдаемыми, поэтому для каждого подписанного наблюдателя существует новый наблюдаемый таймер. Это отношения один-к-одному.

Это предотвращает непреднамеренное использование ресурсов таймером.

Таким образом, поскольку ваш код в настоящее время определен, OnNext вызывается каждые 1000 миллисекунд, а не каждые 100.

Теперь, чтобы код выполнялся с интервалом в 100 миллисекунд, сделайте следующее:

Observable
    .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100))
    .Select(x => Scheduler.NewThread.Schedule(() =>
    {
        count++;
        Thread.Sleep(TimeSpan.FromMilliseconds(1000));
    }))
    .Subscribe(x => { });

Фактически этот код представляет собой IObservable<IDisposable>, где каждое одноразовое использование - это запланированное действие, выполнение которого занимает 1000 миллисекунд.

В моих тестах это работало хорошо и увеличивало счет правильно.

Я попытался использовать свои ресурсы и обнаружил, что, установив таймер на запуск раз в миллисекунду, я быстро получил System.OutOfMemoryException, но обнаружил, что код запускается, если я изменяю настройку на каждые две миллисекунды. Это, однако, использовало более 500 МБ ОЗУ, в то время как код работал и создал около 500 новых потоков. Совсем не красиво.

Действуйте осторожно!

1 голос
/ 24 мая 2013

Если вы искренне, постоянно производите ценности быстрее, чем можете их использовать, то, как указывалось, вы идете к неприятностям.Если вы не можете замедлить темпы производства, вам нужно посмотреть, как их потреблять быстрее.Возможно, вы хотите, чтобы многопоточность наблюдателя использовала несколько ядер?

Если вы используете многопоточность наблюдателя, вам, возможно, следует быть осторожным при обработке событий не по порядку.Вы будете обрабатывать несколько уведомлений одновременно, и все ставки отключены относительно того, какая обработка выполняется в первую очередь (или сначала попадает в критическое состояние гонки).

Если вы не иметь для обработки каждого события в потоке, взглянем на пару различных реализаций ObserveLatestOn, которые плавают вокруг.Есть потоки, обсуждающие это здесь и здесь .

ObserveLatestOn отбросит все, кроме самого последнего уведомления, которое происходит, пока наблюдатель обрабатывает предыдущее уведомление.Когда наблюдатель завершит обработку предыдущего уведомления, он получит последнее уведомление и пропустит все уведомления, которые произошли между ними.

Преимущество этого состоит в том, что он предотвращает накопление давления со стороны производителя, который быстрее, чемего потребитель.Если потребитель медленнее из-за нагрузки, он будет только хуже, обрабатывая больше уведомлений.Удаление ненужных уведомлений может привести к тому, что нагрузка снизится до точки, в которой потребитель сможет не отставать.

...