Springboot.Реактивный веб-клиент.Соединение преждевременно закрыто ПЕРЕД ответом - PullRequest
0 голосов
/ 25 декабря 2018

Я сталкивался с этой проблемой

При подключении к пулу обнаружена ошибка реактор.netty.http.client.HttpClientOperations $ PrematureCloseException: Соединение преждевременно закрыто ПЕРЕД ответом ".

Я собираю метрики с графитового сервера через реактивный веб-клиент для запрошенных таймфреймов (чтобы уменьшить объем данных, передаваемых по http, я разделил дни на куски 24/4), затем объединяю ответы в матрицуи сохраните его в CSV-файл -> объединить с другим. Проблема возникает, когда количество дней увеличивается (2 или 3 работает нормально, но больше дней будет больше ошибок при закрытых соединениях). Попытка использовать задержки, это помогаетбит, но обработать еще один день без ошибок.

Stack-trace: ClosedConnectionStacktrace

Обнаружена немного похожая проблема https://github.com/reactor/reactor-netty/issues/413, но не уверен.

Вот фрагменты кода:

discoveryMono.thenReturn(true) // discover metrics
            .flux()
            .flatMap(m -> Flux.fromIterable(dates) // process all days
                    .delayElements(Duration.ofSeconds(1L))
                    .flatMap(date -> Flux.range(0, 24 / intervalHours) // divide day into chunks
                            .delayElements(Duration.of(100L, ChronoUnit.MILLIS))
                            .flatMap(timeFraction -> Flux.fromIterable(sequentialTasks) // task to invoke webclient
                                    .flatMap(task -> {
                                        Instant from = date.plus(timeFraction * intervalHours, ChronoUnit.HOURS);
                                        Instant until = from.plus(intervalHours, ChronoUnit.HOURS);
                                        TaskParams taskParams = new TaskParams(itSystem, from, until, TaskParams.PollingType.FULLDAY);
                                        log.trace("workflow | from={}, until={}", from, until);
                                        return task.apply(taskParams)
    //                                            .doOnNext(m -> log.trace("Matrix: {}", m))
                                                .onErrorResume(err -> {
                                                    log.error("processFullDaysInChunks | Error: {}", err);
                                                    return Mono.empty();
                                                });
                                    }).flatMap(params -> Flux.fromIterable(fileTasks) // tasks to check/merge files, doesn't matter
                                            .flatMap(fileTask -> parTask.apply(params)
                                                    .onErrorResume(err -> {
                                                        log.error("processFullDaysInChunks | Error: {}", err);
                                                        return Mono.empty();
                                                    })
                                            )
                                    )
                            )
                    )
            ).subscribeOn(fullDayScheduler).subscribe();

и часть задачи с вызовом веб-клиента:

    private Flux<GraphiteResultDTO> getGraphiteResults(ITSystem itSystem, Instant from, Instant until) {
        String fromStr = FROM_PARAMETER + Long.valueOf(from.getEpochSecond()).toString();
        String untilStr = UNTIL_PARAMETER + Long.valueOf(until.getEpochSecond()).toString();
        String uri = RENDER_URI + TARGET_PARAMETER + "{targetParam}" + fromStr + untilStr + FORMAT_JSON_PARAMETER;
        WebClient webClient = getGraphiteWebClient(itSystem.getDataSource());
        Set<String> targetParams = storage.getValueByITSystemId(itSystem.getId()).getSecond();
        Flux<GraphiteResultDTO> result = Flux.fromIterable(targetParams)
                .delayElements(Duration.of(10, ChronoUnit.MILLIS))
                .flatMap(targetParam -> {
                    Map<String, String> params = Map.ofEntries(entry("targetParam", targetParam));
                    if (log.isTraceEnabled()) {
                        log.trace("getGraphiteResults | Uri={}, TargetPatam: {}", uri, targetParam);
                    }
                    return webClient.get()
                            .uri(uri, params)
                            .retrieve()
                            .onStatus(HttpStatus::isError, clientResponse -> {
                                log.trace("clientResponse | transforming body");
                                clientResponse.bodyToMono(String.class)
                                        .doOnNext(errorString -> log.error("retrieve(), error={}", errorString));
//                                                    .flatMap(s -> Flux.error(clientResponse.bodyToFlux(WebClientException.class)));
                                return Mono.empty();
                            })
                            .bodyToFlux(GraphiteResultDTO.class)
                            .onErrorResume(throwable -> {
                                log.error("webclient | bodyToFlux error={}", throwable.getMessage());
                                return Flux.empty();
                            });
                });
        return result;
    }

1 Ответ

0 голосов
/ 19 августа 2019

Решена моя проблема с заменой оператора flatMap на concatMap с предварительной выборкой 1 и ограничением скорости (оператор limitRate).Все запросы теперь обрабатываются последовательно.Так что теперь нет необходимости использовать временные задержки.

...