Flux. Есть ли способ повторить попытку последнего элемента? - PullRequest
0 голосов
/ 14 февраля 2020

Позволяет ли Flux повторить операцию для возникшего исключения, не переводя указатель в исходное положение? Я имею в виду элемент «problemmati c».

Например:

Flux.fromArray(new Integer[]{1, 2, 3})
        .delayElements(Duration.ofSeconds(1))
        .doOnNext(i -> {
            System.out.println("i: " + i);
            if (i == 2) {
                System.out.println("2 found");
                throw new RuntimeException("2!!!!!!!1");
            }
        })
        .retry(2)
        .subscribe();

будет иметь следующий вывод:

i: 1
i: 2
2 found
i: 1
i: 2
2 found
i: 1
i: 2
2 found

, когда я буду w sh чтобы увидеть такой вывод:

i: 1
i: 2
2 found
i: 2
2 found
i: 2
2 found

PS skipUntil это не то, что я ищу

1 Ответ

2 голосов
/ 14 февраля 2020

Не то чтобы я знал, но я могу ошибаться.

Тем не менее, вы можете предоставить эту логику c самостоятельно для этого конкретного шага. Например, но если вы создаете своего собственного Потребителя и оборачиваете в него логи повторных попыток c

public class RetryConsumer<T> implements Consumer<T> {

    private int                 retryCount;
    private Consumer<? super T> delegate;

    public RetryConsumer(int retryCount, Consumer<? super T> delegate) {
        this.retryCount = retryCount;
        this.delegate = delegate;
    }

    @Override
    public void accept(T value) {

        int currentAttempts = 0;
        while (currentAttempts < retryCount) {
            try {
                delegate.accept(value);
                break;
            } catch (Throwable e) {
                currentAttempts++;
                if (currentAttempts == retryCount) {
                    throw e;
                }
                //Still have some attempts left
            }
        }

    }
}

Вы можете затем использовать это в своих шагах Flux, то есть

Flux.fromArray(new Integer[]{1, 2, 3})
    .doOnNext(new RetryConsumer<>(2 , i -> {
        System.out.println("i: " + i);
        if (i == 2) {
            System.out.println("2 found");
            throw new RuntimeException("Error");
        }
     }))
     .subscribe();
...