У меня есть следующий рабочий процесс: есть n
записей, которые необходимо извлечь по сети, и впоследствии n
дорогостоящих вычислений, которые необходимо выполнить для каждой.Вставьте код, это будет выглядеть следующим образом:
List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})
Я хотел бы преобразовать блокирующую часть в асинхронный, чтобы минимизировать время с помощью одного потока - по существу, обеспечивая запись i+1
извлекается при обработке записи i
.Чтобы выполнение выглядело следующим образом:
Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....
Теперь я могу придумать наивный способ реализовать это с List<>
и CompletableFuture
, но для этого потребуется обработать первую запись по-другому.
Есть ли более элегантный способ решения этой проблемы с помощью реактивных потоков?Решение, которое, возможно, позволило бы мне легко настроить, сколько записей Process()
может отставать от Retreive()
?