Как вернуть ответ немедленно клиенту в Spring Flux, контролируя количество потоков, используя ExecutorService и CompletableFuture? - PullRequest
0 голосов
/ 13 января 2020

Мне нужно было вызывать две нисходящие системы параллельно с неблокирующим вводом из API-интерфейса моей службы обслуживания на основе Spring flux. Но емкость первой нисходящей системы составляет 10 запросов за раз, а вторая нисходящая система - 100.

Первая выходная нисходящая система является входом для второй нисходящей системы, поэтому я могу сделать более параллельный запрос ко второй системе ускорить процесс.

Второй ответ нисходящей системы очень велик, поэтому он не может быть сохранен в памяти, чтобы конкретизировать весь ответ. Так немедленно нужно вернуть ответ клиенту.

Рабочий процесс Ex:

enter image description here

Пример кода:

@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getstream() {

    ExecutorService executor = Executors.newFixedThreadPool(10);

    List<CompletableFuture> list = new ArrayList<>();

    AtomicInteger ai = new AtomicInteger(1);
    RestTemplate restTemplate = new RestTemplate();

    for (int i = 0; i < 100; i++) {
        CompletableFuture<Object> cff = CompletableFuture.supplyAsync(

                () -> ai.getAndAdd(1) + " first downstream web service " +
                        restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get(), String.class)

        ).thenApplyAsync(v -> {

            Random r = new Random();
            Integer in = r.nextInt(1000);

            return v + " second downstream web service  " + in + " " + restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get() + 1, String.class) + " \n";
        }, executor);

        list.add(cff);
    }

    return Flux.fromStream(list.stream().map(m -> {
                try {
                    return m.get().toString();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return "";
            })
    );

}

Этот код работает только для первых пяти темы после получения ответа все темы завершили процесс. Но мне нужно было немедленно получить ответ клиенту, как только я получу ответ от второй нисходящей системы.

Примечание. Приведенный выше код не реализован с пулом потоков второго уровня.

Заранее спасибо.

1 Ответ

1 голос
/ 13 января 2020

Если вы строите неблокирующую систему с использованием Spring-Webflux, лучше использовать возможности WebClient в вашем примере. Я создал простое тестовое приложение, в котором для меня работал следующий фрагмент кода:

private final WebClient w = WebClient.create("http://localhost:8080/call"); // web client for external system


@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyClass> getstream() {
    return Flux
            .range(0, 100) // prepare initial 100 requests
            .window(10) // combine elements in batch of 10 (probably buffer will fit better, have a look)

            // .delayElements(Duration.ofSeconds(5)) for testing purpose you can use this function as well
            .doOnNext(flow -> log.info("Batch of 10 is ready")) // double check tells that batch is ready

            .flatMap(flow -> flow
                    // perform an external async call for each element in batch of 10
                    // they will be executed sequentially but there will not be any performance issues because
                    // calls are async. If you wish you can add .parallel() to the flow to make it parallel
                    .flatMap(element -> w.get().exchange())
                    .map(r -> r.bodyToMono(MyClass.class))
            )

            // subscribe to each response and throw received element further to the stream
            .flatMap(response -> Mono.create(s -> response.subscribe(s::success)))

            .window(1000) // batch of 1000 is ready
            .flatMap(flow -> flow
                    .flatMap(element -> w.get().exchange())
                    .map(r -> r.bodyToMono(MyClass.class))
            )
            .flatMap(response -> Mono.create(s -> response.subscribe(s::success)));
}

public static class MyClass {
    public Integer i;
}

ОБНОВЛЕНИЕ:

Я подготовил небольшое приложение для воспроизведения вашего случая. Вы можете найти его в моем хранилище .

...