Как я могу получить последний элемент Flux, не сворачивая его с помощью lower () или last () - PullRequest
1 голос
/ 04 ноября 2019

Как я могу получить последний элемент Flux, не сворачивая его с помощью redu () или last ()? Вот мой вариант использования:

1) У меня есть генератор, который выдает Flux<T> в зависимости от состояния. 2) Когда внутреннее Flux завершает, оно изменяет состояние, которое влияет на следующие Flux объекты, которые я излучаю в генераторе.

С математической точки зрения это выглядит так

static class State {
    int secret = 2;
    int iteration = 0;
}

Random rand = new Random(1024);
Flux<Integer> stream = Flux.<Flux<Integer>, State>generate(State::new, (state, sink) -> {

    System.out.println(String.format("Generate: %d", state.secret));
    Flux<Integer> inner = Flux.range(1, rand.nextInt(10));

    sink.next(inner.doOnComplete(() -> {
        // How do I get last item of `inner` here ?
        // For example I'd like to decrement `state.secret` by last value of `inner`
    }));

    return state;
}).flatMap(Function.identity());

1 Ответ

0 голосов
/ 04 ноября 2019

Так что я взломал его, изменив переменную состояния в генераторе. Это работает, но это не очень функционально. Если кто-то еще может предложить альтернативу, я буду очень признателен.

Random rand = new Random(1024);
Flux.<Flux<String>, State>generate(State::new, (state, sink) -> {

    if (state.iteration < 4) {
        final int count = rand.nextInt(10) + 1;
        System.out.println(String.format("*** Generate %d: start %d (count %d)", state.iteration, state.secret, count));
        Flux<Integer> inner = Flux.range(state.secret, count);

        final int[] last = {Integer.MIN_VALUE};
        sink.next(
                inner
                        .doOnNext(value -> {
                            last[0] = value;
                        })
                        .map(value -> String.format("Iter %d value %d", state.iteration, value))
                        .doOnComplete(() -> {
                            System.out.println(String.format("Inner complete (last item was %d)", last[0]));
                            state.secret = last[0];
                            state.iteration += 1;
                        }));
    } else {
        System.out.println("Generate complete");
        sink.complete();
    }

    return state;
})
        .flatMap(Function.identity())
        .map(value -> {
            System.out.println(String.format("Ext map: %s", value));
            return value;
        })
        .buffer(5)
        .flatMapIterable(Function.identity())
        .subscribe(value -> System.out.println(String.format("  ---> %s", value)));

System.out.println("Exiting");
...