У меня есть пара бесконечных Flowables (получение данных из BlockingQueues). Я объединяю их и подписываюсь с моим заказным подписчиком. Я не понимаю, почему я получаю сообщения только с одного входа Flowable.
Вот мой код:
<T> void example() {
List<BlockingQueue<T>> queues = createQueues();
List<Flowable<T>> allFlowables = queues.stream()
.map(this::createFlowable)
.collect(Collectors.toList());
FlowableScan.merge(allFlowables)
.subscribe(new DefaultSubscriber<T>() {
@Override
protected void onStart() {
System.out.println("Start!");
request(1);
}
@Override
public void onNext(T message) {
System.out.println(message);
request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
}
<T> Flowable<T> createFlowable(BlockingQueue<T> queue) {
return Flowable.generate(emitter -> {
T msg = takeFromQueue(q); // blocking
emitter.onNext(msg);
});
}
Что мне не хватает, что я вижу сообщения только из одной очереди? Я пробовал с планировщиками, но это не помогло. Как исправить приведенный выше код для потребления из всех входных очередей?