Проверить начало потока в Reactor Flux - PullRequest
1 голос
/ 24 марта 2020

Используя Reactor, я пытаюсь проверить начало холодного потока Флюса, а затем стать сквозным.

Например, скажем, мне нужно проверить первые N элементов. Если (и только если) он проходит, эти и другие элементы пересылаются. Если это не удается, выдается только ошибка.

Это то, что я до сих пор. Это работает, но есть ли лучший или более правильный способ сделать это? Я испытывал желание реализовать свой собственный оператор, но мне сказали, что это сложно и не рекомендуется.

flux
.bufferUntil(new Predicate<>() {
    private int count = 0;

    @Override
    public boolean test(T next) {
        return ++count >= N;
    }
})
// Zip with index to know the first element
.zipWith(Flux.<Integer, Integer>generate(() -> 0, (cur, s) -> {
    s.next(cur);
    return cur + 1;
}))
.map(t -> {
    if (t.getT2() == 0 && !validate(t.getT1()))
        throw new RuntimeException("Invalid");
    return t.getT1();
})
// Flatten buffered elements
.flatMapIterable(identity())

Я мог бы использовать doOnNext вместо второго map, поскольку он ничего не отображает , но я не уверен, что это приемлемое использование peek методов.

Я мог бы также использовать преобразователь с сохранением состояния во втором map для запуска только один раз вместо архивирования с индексом, я думаю, это приемлемо так как я уже использую предикат с состоянием ...

1 Ответ

0 голосов
/ 28 марта 2020

Ваше требование звучит интересно! У нас есть switchOnFirst, который может быть полезен для проверки первого элемента. Но если у вас есть N элементов для проверки, мы можем попробовать что-то вроде этого.

Здесь я предполагаю, что мне нужно проверить первые 5 элементов, которые должны быть <= 5. Тогда это допустимый поток. В противном случае мы просто выдавали бы ошибку, говоря, что проверка не удалась. </p>

Flux<Integer> integerFlux = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));

integerFlux
        .buffer(5)
        .switchOnFirst((signal, flux) -> {
            //first 5 elements are <= 5, then it is a valid stream
            return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
        })
        .flatMapIterable(Function.identity())
        .subscribe(System.out::println,
                System.out::println);   

Однако этот подход не годится, так как он продолжает собирать 5 элементов каждый раз, даже после того, как первая проверка выполнена, чего мы можем не хотеть.

Чтобы избежать буферизации N элементов после проверки, мы можем использовать bufferUntil. После того, как мы собрали первые N элементов и проверили, он просто передал бы 1 элемент по мере поступления в нисходящий поток.

AtomicInteger atomicInteger = new AtomicInteger(1);
integerFlux
        .bufferUntil(i -> {
            if(atomicInteger.get() < 5){
                atomicInteger.incrementAndGet();
                return false;
            }
            return true;
        })
        .switchOnFirst((signal, flux) -> {
            return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
        })
        .flatMapIterable(Function.identity())
        .subscribe(System.out::println,
                   System.out::println);
...