Маршрутизация текучих значений - PullRequest
0 голосов
/ 25 марта 2019

У меня есть серверное приложение, которое имеет входящие и исходящие RxJava2 (2.2.7) Flowables для клиентов.
Входящие значения содержат идентификатор целевого клиента, куда они должны перейти.

IПредполагается, что для каждого исходящего Flowable, как этот (должна быть добавлена ​​обработка complete / error):

class Out {
    BlockingArrayQueue queue;

    ... = Flowable.generate( em -> {
        //block until item available
        em.onNext(queue.take());
    });
}

и для каждого входящего Flowable

in.subscribe(
    ...
    void onNext( Msg msg ){
        Out o = getOutByTargetId( msg.getTarget() );
        o.queue.put(msg); // may block if previous not yet taken
        subscription.request(1);
    }
    ...
);

Но недостатки этого решения,что я вижу:

  • Каждый выходной поток может блокировать поток самостоятельно
  • Входные данные могут блокироваться, если выход еще не обработан.Может ли это как-то использовать противодавление?

Вопрос: Это путь?Есть улучшения или совсем другой подход?

Фрэнк

...