Добавить побочный эффект к Flux после использования каждого элемента? - PullRequest
0 голосов
/ 01 августа 2020

Это упрощенный пример, но он иллюстрирует суть.

Допустим, у меня есть метод, определенный следующим образом:

Flux<String> generateFlux() {
  return Flux.just("hello", "world"); // (S)
}

Есть ли способ сделать, например, S напечатать что-нибудь после того, как элемент был израсходован без того, чтобы подписчик потока что-либо делал или знал об этом?

Например, я бы изменил S так, чтобы это:

generateFlux().doOnNext(System.out::println).block()

фактически выводит это на консоль:

hello
consumed
world
consumed

Возможно ли это сделать с помощью Reactor 3.3, и если да, то как?

Обновить

Думаю, мой вопрос был неясным, поэтому я добавлю несколько деталей. Суть моего вопроса заключается в том, что я хочу, чтобы это было без изменений :

generateFlux().doOnNext(System.out::println).block() // A

Поэтому я хочу изменить S, т.е. Flux.just("hello", "world"), чтобы учесть то, что я прошу, не A. Я знаю, что вы можете добавить subscribe и doOnNext et c к A, но это , а не , о чем я прошу. Я спрашиваю, есть ли способ изменить S, чтобы без каких-либо изменений в A это было напечатано, когда подписчик / потребитель A обработал все элементы:

hello
consumed
world
consumed

Т.е. этот S каким-то образом получает сигнал о том, что первый элемент был использован («привет»), а затем он может выполнить doOnNext (или что-то еще) и в этом случае вывести «потребитель». Когда второй элемент израсходован («мир»), он должен снова напечатать «потреблен».

Возможно ли это?

Ответы [ 3 ]

0 голосов
/ 01 августа 2020

Вы можете использовать doOnEach, чтобы добавить побочные эффекты к каждому элементу, потребляемому в данном потоке, без изменения издателя (ошибка и завершение тоже).

  Flux.just("hello", "world")
                .doOnEach(item -> System.out.println(item))
                .blockLast();

doOnEach Documentation

Также есть много перегрузок для разных жизненных циклов, таких как doOnError, doOnSuccess, doOnNext ...

Документация по Flux , вы можете найти там все операторы и описания к ним.

Обновление:

Согласно вашему обновлению, вы можете подумать о сквозной проблеме. Вы можете использовать Hooks для изменения операторов реактивного потока во время сборки.

Hooks Do c

0 голосов
/ 01 августа 2020

Вы можете использовать обратный вызов doOnNext, чтобы перехватить действие издателя, нажимающего следующий элемент. но используйте подписку вместо блока.

generateFlux()
.doOnNext(next => System.out.println("Next Element pushed by publisher" + next))
.subscribe();

. Вы можете использовать подписку, которая отменяет все поведение onNext, onError и onCompleted. метод подписки перегружен в Flux

0 голосов
/ 01 августа 2020

Flux - это издатель, и правильный способ получения данных из потока - это подписка. Итак,

 generateFlux.subscribe(word -> {
      System.out.println(word);
      System.out.println("consumed" + word);
});

, используя подписку с передачей только Consumer в качестве параметра, вы переопределите то, что подписчик будет делать дальше. В основном позади будет создан подписчик с этими двумя отпечатками в качестве поведения. Как работает Flux, вам необходимо понять эти 2 интерфейса

   public interface Publisher<T> {
       void subscribe(Subscriber<? super T> var1);
   }

   public interface Subscriber<T> {
       void onSubscribe(Subscription var1);
 
       void onNext(T var1);
 
       void onError(Throwable var1);

       void onComplete();
}
...