Отбор всех элементов, кроме первых, из потока в проектном реакторе - PullRequest
0 голосов
/ 19 февраля 2020

В проектном потоке реактора есть метод отбора проб Флюс # пробы java до c. Он изменяет поток так, что он генерирует события только в конце указанных периодов.

Можно ли настроить это поведение и добиться этого: на первом элементе - испустить его мгновенно, начать выборку с задержкой от 2-го до конца. По сути, я хочу исключить первый (и только первый) элемент из выборки, чтобы он выделялся без первоначального ожидания.

Можно ли достичь с помощью встроенных операторов? Если нет, то есть ли у кого-нибудь идеи, как решить эту проблему?

Вот самый простой пример того, чего я хочу достичь:

Flux<String> inputFlux = Flux.just("first", "second", "third").delayElements(Duration.ofMillis(400));
Flux<String> transformed = /*do some magic with input flux*/;

StepVerifier.create(transformed)
    .expectNext("first")//first should always be emmited instantly
    //second arrives 400ms after first
    //third arrives 400ms after second
    .expectNoEvent(Duration.ofSeconds(1))
    .expectNext("third")//after sample period last received element should be received 
    .verifyComplete();

Ответы [ 2 ]

1 голос
/ 21 февраля 2020

Превратив поток источника myFlux в горячий поток, вы легко достигните этого:

Flux<T> myFlux;
Flux<T> sharedFlux = myFlux.publish().refCount(2);

Flux<T> first = sharedFlux.take(1);
Flux<T> sampledRest = sharedFlux.skip(1).sample(Duration.ofMillis(whatever));

return Flux.merge(first, sampledRest);
0 голосов
/ 19 февраля 2020

Вы можете достичь этого с помощью метода Flux#sample(org.reactivestreams.Publisher<U>).

yourFlux.take(1)
        .mergeWith(yourFlux.sample(Flux.interval(yourInterval)
                .delaySubscription(yourFlux.take(1))))
...