У меня есть серверное приложение, которое имеет входящие и исходящие RxJava2 (2.2.7) Flowables для клиентов.
Входящие значения содержат идентификатор целевого клиента, куда они должны перейти.
IПредполагается, что для каждого исходящего Flowable, как этот (должна быть добавлена обработка complete / error):
class Out {
BlockingArrayQueue queue;
... = Flowable.generate( em -> {
//block until item available
em.onNext(queue.take());
});
}
и для каждого входящего Flowable
in.subscribe(
...
void onNext( Msg msg ){
Out o = getOutByTargetId( msg.getTarget() );
o.queue.put(msg); // may block if previous not yet taken
subscription.request(1);
}
...
);
Но недостатки этого решения,что я вижу:
- Каждый выходной поток может блокировать поток самостоятельно
- Входные данные могут блокироваться, если выход еще не обработан.Может ли это как-то использовать противодавление?
Вопрос: Это путь?Есть улучшения или совсем другой подход?
Фрэнк