Мне нужно было вызывать две нисходящие системы параллельно с неблокирующим вводом из API-интерфейса моей службы обслуживания на основе Spring flux. Но емкость первой нисходящей системы составляет 10 запросов за раз, а вторая нисходящая система - 100.
Первая выходная нисходящая система является входом для второй нисходящей системы, поэтому я могу сделать более параллельный запрос ко второй системе ускорить процесс.
Второй ответ нисходящей системы очень велик, поэтому он не может быть сохранен в памяти, чтобы конкретизировать весь ответ. Так немедленно нужно вернуть ответ клиенту.
Рабочий процесс Ex:
Пример кода:
@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getstream() {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<CompletableFuture> list = new ArrayList<>();
AtomicInteger ai = new AtomicInteger(1);
RestTemplate restTemplate = new RestTemplate();
for (int i = 0; i < 100; i++) {
CompletableFuture<Object> cff = CompletableFuture.supplyAsync(
() -> ai.getAndAdd(1) + " first downstream web service " +
restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get(), String.class)
).thenApplyAsync(v -> {
Random r = new Random();
Integer in = r.nextInt(1000);
return v + " second downstream web service " + in + " " + restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get() + 1, String.class) + " \n";
}, executor);
list.add(cff);
}
return Flux.fromStream(list.stream().map(m -> {
try {
return m.get().toString();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return "";
})
);
}
Этот код работает только для первых пяти темы после получения ответа все темы завершили процесс. Но мне нужно было немедленно получить ответ клиенту, как только я получу ответ от второй нисходящей системы.
Примечание. Приведенный выше код не реализован с пулом потоков второго уровня.
Заранее спасибо.