RxJava асинхронная альтернатива doOnNext - PullRequest
0 голосов
/ 26 мая 2018

Я хочу сделать длинную операцию с RxJava, не блокируя их использование методом подписки (или другим способом).longOperation () просто сохраняет значение в БД, поэтому оно не может влиять на само значение.У меня есть следующий код:

Observable.interval(1, TimeUnit.SECONDS)
          .doOnNext { 
              value -> longOperation(value)
           }
          .subscribe {
              finalConsumer(it)  
           }

Как сделать это асинхронным?

Спасибо.

Ответы [ 3 ]

0 голосов
/ 29 мая 2018

Почему бы не использовать subscribeOn?Если вы используете subscribeOn(Schedulers.io()) и не используете observeOn выше doOnNext, ваш код будет выполняться в потоке ввода-вывода.

0 голосов
/ 03 июля 2018

Это должно работать

Observable.interval(1, TimeUnit.SECONDS)
        .flatMap(value -> Observable.just(value)
                .mergeWith(Completable
                        .fromAction(() -> longOperation(value))
                        .subscribeOn(Schedulers.io())))
        .subscribe(finalConsumer(it));
0 голосов
/ 27 мая 2018

Используйте flatMap и subscribeOn:

Observable.interval(1, TimeUnit.SECONDS)
  .flatMap(value ->
      Completable.fromAction(() ->
        longOperation(value))
      .toObservable()
      .subscribeOn(Scheduler.io()))
  .subscribe {
     subscriber  
  }

Вы также можете захотеть ограничить планировщик, заменив Schedulers.io() на Schedulers.from(Executors.newFixedThreadPool(n)).

...