Project Reactor: асинхронный doOnNext (или другие doOnXXX) - PullRequest
0 голосов
/ 12 декабря 2018

Есть ли какой-нибудь метод, например doOnNext, но async?Например, мне нужно сделать долгую регистрацию (отправлено уведомление по электронной почте) для определенного элемента.

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            // For example, we need to do something time-consuming only for 3

            if (v.equals(3)) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("LOG FOR " + v);
        });

ints.subscribe(System.out::println);

Но почему я должен ждать регистрации 3?Я хочу сделать эту логику асинхронно.

Теперь у меня есть только это решение

Thread.sleep(10000);

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Scheduler myParallel2 = Schedulers.newParallel("my-parallel2", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            Mono.just(v).publishOn(myParallel2).subscribe(value -> {
                if (value.equals(3)) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println("LOG FOR " + value);
            });
        });

ints.subscribe(System.out::println);

Есть ли какое-нибудь "хорошее" решение для этого?

Ответы [ 2 ]

0 голосов
/ 19 декабря 2018
    Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
            .flatMap(integer -> {
                        if (integer != 3) {
                            return Mono.just(integer)
                                    .map(integer1 -> {
                                        System.out.println(integer1);
                                        return integer;
                                    })
                                    .subscribeOn(Schedulers.parallel());
                        } else {
                            return Mono.just(integer)
                                    .delayElement(Duration.ofSeconds(3))
                                    .map(integer1 -> {
                                        System.out.println(integer1);
                                        return integer;
                                    })
                                    .subscribeOn(Schedulers.parallel());
                        }
                    }

            );

    ints.subscribe();
0 голосов
/ 12 декабря 2018

Если вы абсолютно уверены, что вам все равно, отправка электронной почты будет успешной или нет, то вы могли бы использовать "subscribe-inside-doOnNext", но я уверен, что это будет ошибкой..

Для того, чтобы ваш Flux распространял сигнал onError в случае сбоя "регистрации", рекомендуется использовать flatMap.

Хорошая новость заключается в том, что, посколькуflatMap немедленно объединяет результаты внутренних издателей в основную последовательность, вы можете немедленно отправлять каждый элемент И запускать электронную почту.Единственное предостережение в том, что все это будет только завершено после того, как отправка электронной почты Mono также будет завершена.Вы также можете проверить в flatMap лямбде, нужно ли вообще вести запись (а не внутри Mono):

//assuming sendEmail returns a Mono<Void>, takes care of offsetting any blocking send onto another Scheduler

source //we assume elements are also publishOn as relevant in `source`
   .flatMap(v -> {
       //if we can decide right away wether or not to send email, better do it here
       if (shouldSendEmailFor(v)) {
           //we want to immediately re-emit the value, then trigger email and wait for it to complete
           return Mono.just(v)
               .concatWith(
                   //since Mono<Void> never emits onNext, it is ok to cast it to V
                   //which makes it compatible with concat, keeping the whole thing a Flux<V>
                   sendEmail(v).cast(V.class)
               );
       } else {
           return Mono.just(v);
       }
    });
...