Я не уверен, что это действительно соответствует моим требованиям.
Это не так.Применить либо onBackpressureLatest
, либо onBackpressureBuffer
, а затем observeOn
в observeSomePacketsOn
и observeAllPacketsOn
соответственно.
Делает ли вызов onBackpressureLatest так, чтобы элементы больше не передавались в многоадресном режиме?
Многоадресная рассылка осуществляется PublishProcessor
, и разные подписчики будут устанавливать для нее канал независимо, где операторы onBackpressureXXX
и observeOn
вступают в силу для отдельных подписчиков.
Как я могу проверить свои требования?
Подписаться через Flowable
с потерями или без потерь с TestSubscriber
(Flowable.test()
), передать известный набор пакетов в packets
ипосмотрите, все ли они прибыли через TestSubscriber.assertValueCount()
или TestSubscriber.values()
.Один с потерями должен быть 1 .. N, а один без потерь должен иметь значения N после льготного периода.
Бонус: если у меня несколько таких издателей (в одном классе или в другом месте), что такоеЛучший способ сделать один и тот же шаблон многоразовым.Создать мой собственный Flowable с делегированием / дополнительными методами?
Вы можете превратить observeAllPacketsOn
в FlowableTransformer
и вместо вызова метода в MyBroadcaster использовать compose
, например:
class MyTransformers {
public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
return f -> f.onBackpressureLatest().observeOn(s);
}
}
new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);