Отображение каждого emit - SwitchMap гарантирует как минимум 1 гибрид emit / ConcatMap? - PullRequest
0 голосов
/ 16 декабря 2018

Я ломаю голову над тем, как это сделать в RX.

T Фактический вариант использования - это отображение LowerLevelEvent (val userId: String) в HigherLevelEvent (val user: User), где Userобеспечивается observable, поэтому он может выдавать n раз, , поэтому пример вывода

LowerLevelEvent1(abc) -> HigherLevelEvent1(userAbc(nameVariation1)
LowerLevelEvent2(abc) -> HigherLevelEvent2(userAbc(nameVariation1)
LowerLevelEvent3(abc) -> HigherLevelEvent3(userAbc(nameVariation1)
LowerLevelEvent4(abc) -> HigherLevelEvent4(userAbc(nameVariation1)
                         HigherLevelEvent4(userAbc(nameVariation2)
                         HigherLevelEvent4(userAbc(nameVariation3)

Так что моим наивным решением было использовать объединение результатов.Таким образом, хотя userId не изменяется, пользовательское наблюдаемое подписывается, то есть не повторно подписывается, когда новый lowerLevelEmits и его userId не изменяются.*

Тогда я подумал о

lowerLevelObservable.
   .switchMap { lowerLevelEvent ->
      userRepository.findByIdObservable(lowerLevelEvent.userId)
          .map { user -> createHigherLevelInstance... }
   }

Это, однако, может сломаться, если lowerLevelObservable испускает быстро, и, поскольку наблюдаемое пользователем может занять некоторое время, данное событие lowerLevelX может быть пропущено, чего у меня не может быть.Кроме того, он повторно подписывает пользователя, наблюдаемого для каждого выброса, что является расточительным, поскольку он, скорее всего, не изменится

Итак, может быть, concatMap?Это связано с тем, что наблюдаемая пользователем не завершена, поэтому concatMap не будет работать.

У кого-нибудь есть подсказка?

Большое спасибо

// Разъяснение: в основном его отображениеВарианты (A1, A2 ..) к вариантам A '(A1', A2 '..) при присоединении к нему запрашиваемого объекта, где запрос является наблюдаемым, поэтому он может повторно отправляться после создания сопоставления, поэтому AX' необходимополучить новый результат запроса.Но запрос холодный и не завершается

Так пример A1(1) -> A1'(user1), A2(1) -> A2'(user1), A3(1) -> A3'(user1) - теперь кто-то меняет user1 где-то еще в приложении, поэтому следующий emit - A3'(user1')

1 Ответ

0 голосов
/ 16 декабря 2018

Исходя из сделанных вами комментариев, ниже будет работать в RxSwift.Я понятия не имею, как перевести его на RxJava.Честно говоря, я думаю, что здесь есть фундаментальное неправильное использование Rx.Удачи.

Как это работает: если ему разрешено подписываться, то он будет добавлять событие в буфер для последующего использования.Разрешается подписка, если в настоящее время она не подписана на внутреннее событие или если внутренняя наблюдаемая, на которую она в данный момент подписана, испустила элемент.

ПРЕДУПРЕЖДЕНИЕ: он не обрабатывает завершения правильно, как он есть.Я оставлю это вам в качестве упражнения.

func example(lowerLevelEventObservable: Observable<LowerLevelEvent>, userRepository: UserRepository) {
    let higherLevelEventObservable = lowerLevelEventObservable
        .flatMapAtLeastOnce { event in // RxSwift's switchLatest I think.
            Observable.combineLatest(
                Observable.just(event),
                userRepository.findByIdObservable(event.userId),
                resultSelector: { (lowLevelEvent: $0, user: $1) }
            )
        }
        .map { createHigherLevelInstance($0.lowLevelEvent, $0.user) }

    // use higherLevelEventObservable
}

extension ObservableType {
    func flatMapAtLeastOnce<U>(from fn: @escaping (E) -> Observable<U>) -> Observable<U> {
        return Observable.create { observer in
            let disposables = CompositeDisposable()
            var nexts: [E] = []
            var disposeKey: CompositeDisposable.DisposeKey?
            var isAllowedToSubscribe = true
            let lock = NSRecursiveLock()
            func nextSubscription() {
                isAllowedToSubscribe = true
                if !nexts.isEmpty {
                    let e = nexts[0]
                    nexts.remove(at: 0)
                    subscribeToInner(e)
                }
            }

            func subscribeToInner(_ element: E) {
                isAllowedToSubscribe = false
                if let key = disposeKey {
                    disposables.remove(for: key)
                }
                let disposable = fn(element).subscribe { innerEvent in
                    lock.lock(); defer { lock.unlock() }
                    switch innerEvent {
                    case .next:
                        observer.on(innerEvent)
                        nextSubscription()
                    case .error:
                        observer.on(innerEvent)
                    case .completed:
                        nextSubscription()
                    }
                }
                disposeKey = disposables.insert(disposable)
            }

            let disposable = self.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case let .next(element):
                    if isAllowedToSubscribe == true {
                        subscribeToInner(element)
                    }
                    else {
                        nexts.append(element)
                    }
                case let .error(error):
                    observer.onError(error)
                case .completed:
                    observer.onCompleted()
                }
            }
            _ = disposables.insert(disposable)
            return disposables
        }
    }
}
...