Как подписаться на объединенные бесконечные текучие среды? - PullRequest
0 голосов
/ 17 октября 2019

У меня есть пара бесконечных Flowables (получение данных из BlockingQueues). Я объединяю их и подписываюсь с моим заказным подписчиком. Я не понимаю, почему я получаю сообщения только с одного входа Flowable.

Вот мой код:

<T> void example() {
    List<BlockingQueue<T>> queues = createQueues();

    List<Flowable<T>> allFlowables = queues.stream()
            .map(this::createFlowable)
            .collect(Collectors.toList());

    FlowableScan.merge(allFlowables)
            .subscribe(new DefaultSubscriber<T>() {

                @Override
                protected void onStart() {
                    System.out.println("Start!");
                    request(1);
                }

                @Override
                public void onNext(T message) {
                    System.out.println(message);
                    request(1);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {
                    System.out.println("Done!");
                }
            });
}

<T> Flowable<T> createFlowable(BlockingQueue<T> queue) {
    return Flowable.generate(emitter -> {
        T msg = takeFromQueue(q); // blocking
        emitter.onNext(msg);
    });
}

Что мне не хватает, что я вижу сообщения только из одной очереди? Я пробовал с планировщиками, но это не помогло. Как исправить приведенный выше код для потребления из всех входных очередей?

1 Ответ

1 голос
/ 18 октября 2019

Потому что вы блокируете единственный поток, обслуживающий все источники в первой очереди. Вы должны ввести асинхронность, такую ​​как применение .subscribeOn(Schedulers.io()) в createFlowable.

<T> Flowable<T> createFlowable(BlockingQueue<T> queue) {
    return Flowable.generate(emitter -> {
        T msg = takeFromQueue(q); // blocking
        emitter.onNext(msg);
    }).subscribeOn(Schedulers.io()); // <----------------------------------
}
...