RxSwift.Выполнять отдельные Observables последовательно - PullRequest
0 голосов
/ 28 сентября 2018

Я пытаюсь выполнить свои Наблюдаемые, чтобы они выполнялись только после завершения предыдущего Наблюдения.Я не могу использовать flatMap, потому что подписки могут вызываться из разных мест, и эти Observables не связаны друг с другом.Чтобы быть конкретным: мой CollectionView загружает больше контента с сервера и через 2 секунды после этого пользователь нажимает кнопку «Отправить комментарий», пока CollectionView все еще загружает свой пакет.Поэтому я хочу подождать, пока обновление CollectionView завершится, и только после этого выполнить запрос на публикацию моего комментария.Я создал класс с именем ObservableQueue, и он работает просто отлично.Но мне нужно знать, есть ли такие проблемы, как утечки памяти, мертвые блокировки или, может быть, я просто что-то упустил.Вот оно:

extension CompositeDisposable {

    @discardableResult
    func insert(disposeAction: @escaping () -> ()) -> DisposeKey? {
        return insert(Disposables.create(with: disposeAction))
    }

}

class ObservableQueue {

    private let lock = NSRecursiveLock()
    private let relay = BehaviorRelay(value: 0)
    private let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "ObservableQueue.scheduler")

    func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
        return Observable.create({ observer -> Disposable in
            let disposable = CompositeDisposable()

            let relayDisposable = self
                .relay
                .observeOn(self.scheduler)
                .filter({ value -> Bool in
                    if value > 0 {
                        return false
                    }

                    self.lock.lock(); defer { self.lock.unlock() }

                    if self.relay.value > 0 {
                        return false
                    }

                    self.relay.accept(self.relay.value + 1)

                    disposable.insert {
                        self.lock.lock(); defer { self.lock.unlock() }
                        self.relay.accept(self.relay.value - 1)
                    }

                    return true
                })
                .take(1)
                .flatMapLatest { _ in observable }
                .subscribe { observer.on($0) }

            _ = disposable.insert(relayDisposable)

            return disposable
        })
    }

}

И тогда я могу использовать это так:

let queue = ObservableQueue()

...

// first observable
let observable1 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable1)
    .subscribe(onNext: { _ in
        print("here1")
     })
    .disposed(by: rx.disposeBag)

// second observable
let observable2 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable2)
    .subscribe(onNext: { _ in
        print("here2")
    })
    .disposed(by: rx.disposeBag)

// third observable
let observable3 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable3)
    .subscribe(onNext: { _ in
        print("here3")
    })
    .disposed(by: rx.disposeBag)

Ответы [ 3 ]

0 голосов
/ 28 сентября 2018

Я бы использовал .combineLatest () для создания события, когда обе наблюдаемые что-то испустили.Смотри http://rxmarbles.com/#combineLatest

0 голосов
/ 29 сентября 2018

CLGeocoder имеет ту же проблему.Согласно документации, вы не можете вызвать один из методов геокодера, пока он работает с предыдущим запросом, очень похожим на то, что вы пытаетесь сделать.В этом разделе (https://gist.github.com/dtartaglia/64bda2a32c18b8c28e1e22085a05df5a), вы обнаружите, что я делаю наблюдаемые вызовы в фоновом потоке и защищаю работу семафором. Это ключ, вам нужен семафор, а не блокировка.

Что-токак это должно работать для вас:

class ObservableQueue {

    private let semaphore = DispatchSemaphore(value: 1)
    private let scheduler = ConcurrentDispatchQueueScheduler(qos: .userInitiated)

    func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
        let _semaphore = semaphore // To avoid the use of self in the block below
        return Observable.create { observer in
            _semaphore.wait()
            let disposable = observable.subscribe { event in
                switch event {
                case .next:
                    observer.on(event)
                case .error, .completed:
                    observer.on(event)
                }
            }
            return Disposables.create {
                disposable.dispose()
                _semaphore.signal()
            }
        }
        .subscribeOn(scheduler)
    }
}
0 голосов
/ 28 сентября 2018

Я дам вам несколько советов, которые, я думаю, помогут вам в будущем.

  1. Избегайте, насколько это возможно, Observable.create, это создание "грубой силы"наблюдаемый, и он вообще не справляется с обратным давлением, вам придется реализовать его самостоятельно, и это не что-то простое.

  2. Обычно для вызовов HTTP API, вы ненужно Observable, вы должны использовать Single или Completable, поскольку вы ожидаете только один ответ от вашего сервера, а не поток ответов.

  3. Вы должны быть осторожны с strong self внутриonNext/on..., как правило, если класс, подписывающийся на наблюдателя, имеет мешок для утилизации, вы должны использовать weak self.

Теперь для вашего конкретного случая, есливам нужна только эта пара наблюдателей (получить и отправить комментарий), я думаю, что очередь немного излишня.Вы можете просто вызвать обозреватель комментария (если он доступен) по методу do(onNext:) вашего обозревателя «получить».Do on next вызывается каждый раз, когда запускается событие onNext.

Если вам все еще нужна очередь, я бы пошел с OperationQueue, который ставит в очередь только операции и имеет метод, подобный observeOperationchanges() -> Observeble<Operation>, это будетзапускаться каждый раз, когда операция завершена.Таким образом, вы подписываетесь один раз и ставите в очередь несколько раз, но это может не соответствовать вашим потребностям.

...