Правильная обработка Обратного давления и Параллелизма с RxJava2 - PullRequest
0 голосов
/ 23 мая 2018

У меня есть служба, с которой я зарегистрировал обратный звонок, и теперь я хочу выставить его как Flowable с определенными требованиями / ограничениями:

  1. Поток, получающий обратный вызов, не должен блокироваться (работа должна быть передана другому потоку / планировщику, указанному наблюдателем)
  2. Не должно быть никаких исключений из-за замедления потока потребителей
  3. Несколько потребителей могут подписаться на него независимо отдруг друга
  4. потребители могут выбрать для буферизации все элементы, чтобы ни один из них не был потерян, однако их не следует буферизовать в классе «продюсер»

Ниже приведено то, что у меня естьв настоящее время

class MyBroadcaster {
    private PublishProcessor<Packet> packets = PublishProcessor.create();
    private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();

    public MyBroadcaster() {
       //this is actually different to my exact use but same conceptually
       registerCallback(packets::onNext);
    }

    public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
        return backpressuredPackets.observeOn(scheduler);
    }
}

Я не уверен, соответствует ли это моим требованиям.На javadoc onBackpressureLatest есть примечание, касающееся observeOn, которое я не понимаю:

Обратите внимание, что из-за характера того, как запросы противодавления распространяются через подписку на / подписать, запрашивая более 1от downstream не гарантирует непрерывную доставку событий onNext

И у меня есть другие вопросы:

  • Делает ли вызов onBackpressureLatest так, что элементы небольше многоадресной рассылки?
  • Как я могу проверить свои требования?
  • Бонус: если у меня несколько таких издателей (в одном классе или в другом месте), каков наилучший способ сделать один и тот же шаблон многоразовым.Создать мой собственный Flowable с делегированием / дополнительными методами?

1 Ответ

0 голосов
/ 23 мая 2018

Я не уверен, что это действительно соответствует моим требованиям.

Это не так.Применить либо onBackpressureLatest, либо onBackpressureBuffer, а затем observeOn в observeSomePacketsOn и observeAllPacketsOn соответственно.

Делает ли вызов onBackpressureLatest так, чтобы элементы больше не передавались в многоадресном режиме?

Многоадресная рассылка осуществляется PublishProcessor, и разные подписчики будут устанавливать для нее канал независимо, где операторы onBackpressureXXX и observeOn вступают в силу для отдельных подписчиков.

Как я могу проверить свои требования?

Подписаться через Flowable с потерями или без потерь с TestSubscriber (Flowable.test()), передать известный набор пакетов в packets ипосмотрите, все ли они прибыли через TestSubscriber.assertValueCount() или TestSubscriber.values().Один с потерями должен быть 1 .. N, а один без потерь должен иметь значения N после льготного периода.

Бонус: если у меня несколько таких издателей (в одном классе или в другом месте), что такоеЛучший способ сделать один и тот же шаблон многоразовым.Создать мой собственный Flowable с делегированием / дополнительными методами?

Вы можете превратить observeAllPacketsOn в FlowableTransformer и вместо вызова метода в MyBroadcaster использовать compose, например:

class MyTransformers {
    public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
        return f -> f.onBackpressureLatest().observeOn(s);
    }
}

new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);
...