Тайм-аут реактора проекта - PullRequest
0 голосов
/ 25 января 2020

Я работаю над мастерской проектного реактора и застрял со следующей задачей:

/**
     * TODO 5
     * <p>
     * For each item call received in colors flux call the {@link #simulateRemoteCall} operation.
     * Timeout in case the {@link #simulateRemoteCall} does not return within 400 ms, but retry twice
     * If still no response then provide "default" as a return value
     */

Проблема, которую я не могу обернуть, заключается в том, что Flux на самом деле никогда не генерирует TimeOutException! Я могу наблюдать это в журнале консоли:

16:05:09.759 [main] INFO Part04HandlingErrors - Received red delaying for 300 
16:05:09.781 [main] INFO Part04HandlingErrors - Received black delaying for 500 
16:05:09.782 [main] INFO Part04HandlingErrors - Received tan delaying for 300 

Я попытался изменить порядок утверждений, хотя это, похоже, не изменило поведение. Примечание: Кроме того, я попробовал перегруженный вариант timeout (), который принимает значение по умолчанию, которое должно быть возвращено, если элемент не генерируется.

public Flux<String> timeOutWithRetry(Flux<String> colors) {

        return colors
                .timeout(Duration.ofMillis(400))
                //.timeout(Duration.ofMillis(400), Mono.just("default"))
                .retry(2)
                .flatMap(this::simulateRemoteCall)
                .onErrorReturn(TimeoutException.class, "default");

    }

Может кто-нибудь выяснить, почему не происходит тайм-аут ? Я подозреваю, что механизм как-то не «привязан» к методу, вызываемому flatMap.

Для полноты: вспомогательный метод:

public Mono<String> simulateRemoteCall(String input) {
        int delay = input.length() * 100;
        return Mono.just(input)
                .doOnNext(s -> log.info("Received {} delaying for {} ", s, delay))
                .map(i -> "processed " + i)
                .delayElement(Duration.of(delay, ChronoUnit.MILLIS));
    }

Больше полноты, это тест, который мне дают для проверки работоспособности:

@Test
    public void timeOutWithRetry() {
        Flux<String> colors = Flux.just("red", "black", "tan");

        Flux<String> results = workshop.timeOutWithRetry(colors);

        StepVerifier.create(results).expectNext("processed red", "default", "processed tan").verifyComplete();
    }

Ответы [ 2 ]

1 голос
/ 29 января 2020

Ответ Мартина Тарьяни правильный, но вы также спросили, почему в вашем коде

    return colors
            .timeout(Duration.ofMillis(400))
            //.timeout(Duration.ofMillis(400), Mono.just("default"))
            .retry(2)
            .flatMap(this::simulateRemoteCall)
            .onErrorReturn(TimeoutException.class, "default");

тайм-аут не происходит.

Причина в том, что если элементы colors поток доступен, тогда вызов .timeout(Duration.ofMillis(400)) не имеет никакого эффекта, поскольку timeout распространяет только TimeoutException, если нет элемент испускается в течение заданной длительности 400 мс, но это не так в этом примере.

В результате элемент испускается, и retry(2) также не имеет никакого эффекта. Затем вы вызываете simulateRemoteCall для испускаемого элемента, который занимает некоторое время, но не возвращает ошибку. Результат вашего кода (помимо временных различий) такой же, как если бы вы просто применили карту к данному потоку:

public Flux<String> timeOutWithRetry(Flux<String> colors) {
    return colors.map(s -> "processed " + s);
}

Если вы хотите увидеть тайм-аут при вызове simulateRemoteCall, тогда вы должны добавьте метод timeout после этого вызова.

Вместо использования flatMap вы также можете использовать concatMap. Разница заключается в том, должен ли порядок сохраняться или нет, то есть могут ли значения default выходить из строя или нет.

При использовании concatMap ответ выглядит следующим образом:

public Flux<String> timeOutWithRetry(Flux<String> colors) {
    return colors.concatMap(
            color -> simulateRemoteCall(color)
                        .timeout(Duration.ofMillis(400))
                        .retry(2)
                        .onErrorReturn("default"));
}
1 голос
/ 25 января 2020

Вы правы в том, что это неправильный порядок и место утверждений. Поскольку вы хотите повторить / тайм-аут / обработать ошибку удаленного вызова, вы должны поместить эти операторы в Mono удаленного вызова вместо Flux.

Тайм-аут на Flux наблюдает время между последующими элементами Однако, когда вы используете flatMap, вы получаете параллелизм из коробки, и задержка между элементами практически равна нулю (при условии, что colors Flux получен из списка в памяти). Так что этот оператор не следует ставить непосредственно на Flux для достижения вашей цели.

Повторить на Flux означает, что он повторно подписывается на источник в случае ошибки, что в зависимости от источника может привести к обработка уже обработанных элементов. Вместо этого вы хотите повторить только неудачные элементы, поэтому его также следует поставить на Mono.

public Flux<String> timeOutWithRetry(Flux<String> colors) {

    return colors.flatMap(color -> simulateRemoteCall(color).timeout(Duration.ofMillis(400))
                                                            .retry(2)
                                                            .onErrorReturn("default"));
}
...