RxSwift flatMapПоследний без удаления предыдущих наблюдаемых - PullRequest
0 голосов
/ 01 июня 2018

Я использую RxSwift Playground из: https://github.com/ReactiveX/RxSwift

Я реализовал следующий пример кода для моделирования простой асинхронной задачи, преобразовав Observable of Int в Observable of String:

let pubishSubject = PublishSubject<Int>()
pubishSubject.asObservable()
    .debug("before")
    .flatMap ({ (value) -> Observable<String> in
        let task: Observable<String> = Observable.create { observer in
            DispatchQueue.main.asyncAfter(deadline: .now() + 1, execute: {
                print("Executed \(value)")
                observer.on(.next("Async \(value)"))
                observer.on(.completed)
            })
            return Disposables.create(with: {
                print("Disposed \(value)")
            })
        }
        return task
    })
    .debug("after")
    .subscribe()

pubishSubject.onNext(1)
pubishSubject.onNext(2)
pubishSubject.onNext(3)

Отладочный вывод:

2018-06-01 14:08:35.748: after -> subscribed
2018-06-01 14:08:35.749: before -> subscribed
2018-06-01 14:08:35.751: before -> Event next(1)
2018-06-01 14:08:35.753: before -> Event next(2)
2018-06-01 14:08:35.753: before -> Event next(3)
Executed 1
2018-06-01 14:08:36.785: after -> Event next(Async 1)
Disposed 1
Executed 2
2018-06-01 14:08:36.786: after -> Event next(Async 2)
Disposed 2
Executed 3
2018-06-01 14:08:36.787: after -> Event next(Async 3)
Disposed 3

Мне нужно выполнить все асинхронные задачи, чтобы продолжить, что работает совершенно нормально, но мне нужно подписаться только на самый последний результат (Async 3).

Я не могу использовать оператор flatMapLastest, потому что он отменяет / удаляет предыдущие асинхронные вызовы.

Я понятия не имею, как этого добиться, и мне кажется, что меня полностью заблокировали, может быть, я могу использовать для этого простой оператор RxSwift илиМне нужно каким-то образом разбить последовательность.

Я надеюсь, что у кого-то есть идея, как этого добиться, и он может мне помочь, большое спасибо.

Редактировать: Для меня это больше похоже на реализацию очереди, поскольку новые значения Int публикуются в PublishSubject И все еще выполняются асинхронные задачи, он должен пропускать все промежуточные результаты и возвращать последний результат на основепоследний испущенный Int

1 Ответ

0 голосов
/ 01 июня 2018

У меня есть решение в RxJava.Перевод в RxSwift должен быть простым.

Используйте оператор switchMap() для постоянного переключения на последнюю эмиссию.Чтобы завершить предыдущие выбросы, вы должны подписаться на них, а также иметь значения выбросов.Для этого используйте оператор publish():

@Test
public void testSwitchMap() throws Exception {
    PublishSubject<Integer> source = PublishSubject.create();

    source
            .doOnNext( v -> logger.debug( "emission: {}", v ) )
            .switchMap( v -> Observable.just( v )
                    .delay( 100, TimeUnit.MILLISECONDS )
                    .publish( s -> {
                        s.subscribe();
                        return s;
                    } ) )
            .subscribe( v -> logger.debug( "end result {}", v ) );

    source.onNext( Integer.valueOf( 1 ) );
    source.onNext( Integer.valueOf( 2 ) );
    source.onNext( Integer.valueOf( 3 ) );
    Thread.sleep( 1_000 );
}
...