Проблема в том, что vaultQueueConsumer
включает медленную работу. Таким образом, решение состоит в том, чтобы извлечь эту медленную операцию из generate
в map
, которую можно распараллелить.
В качестве идеи вы можете создать имя очереди, из которой должны поступать сообщения, и выполнять фактическую потребление сообщения в методе map
после параллельного потока:
String queue = "test";
Flux.<String>generate(synchronousSink -> synchronousSink.next(queue))
.parallel()
.runOn(Schedulers.parallel())
.map(queueManipulator::readMessage)
.doOnNext(log::info)
.subscribe();
Подделка QueueManipulator
спит 1-2 секунды перед возвратом сообщения:
public class QueueManipulator {
private final AtomicLong counter = new AtomicLong();
public String readMessage(String queue) {
sleep(); //sleep 1-2 seconds
return queue + " " + counter.incrementAndGet();
}
//...
}
Таким образом, потребление сообщения выполняется параллельно:
12:49:22.362 [parallel-4] - test 2
12:49:22.362 [parallel-3] - test 4
12:49:22.362 [parallel-2] - test 1
12:49:22.362 [parallel-1] - test 3
12:49:23.369 [parallel-3] - test 6
12:49:23.369 [parallel-1] - test 5
12:49:23.369 [parallel-2] - test 7
12:49:23.369 [parallel-4] - test 8
Приведенное выше решение является простой покупкой, может выглядеть как «взлом».
Другая идея - позвонить Flux.generate
в flatMap
:
String queue = "test";
int parallelism = 5;
Flux.range(0, parallelism)
.flatMap(i ->
Flux.<String>generate(synchronousSink -> {
synchronousSink.next(queueManipulator.readMessage(queue));
}).subscribeOn(Schedulers.parallel()))
.doOnNext(log::info)
.subscribe();