RxJava Асинхронный дебасинг - PullRequest
0 голосов
/ 26 апреля 2018

Используя операции RxJava, возможно ли "игнорировать" элементы, которые были обработаны в нисходящем потоке, если восходящий поток испускает новые элементы?

например

Observable.create(...)
  .flatMap(...) // 30 seconds to process (Asynchronous)
  .flatMap(...) // 30 seconds to process (Asynchronous)
  etc...

что мне нужно сделать, это если вышестоящий поток уведомит о новом элементе, он отменит любую операцию в потоке ниже текущего, который выполняется в данный момент, даже если нисходящий поток делает что-то в другом потоке асинхронно.

1 Ответ

0 голосов
/ 26 апреля 2018

Вместо оператора flatMap() вы можете использовать switchMap(). Когда в switchMap() получено новое значение, старая наблюдаемая отписывается, а наблюдаемая замена подписывается.

Observable.create( ... )
  .switchMap( value -> getObservable1( value )
                         .switchMap( value2 -> getObservable2( value2 ) )
  ...

Если вы хотите, чтобы наблюдатели, находящиеся ниже по течению, также были отменены, вам придется распространить switchMap(). В приведенном выше коде эмиссия на первой стадии цепочки наблюдателей будет отписываться от getObservable1( value ) и getObservable2( value2 ).

...