Я пытаюсь выполнить свои Наблюдаемые, чтобы они выполнялись только после завершения предыдущего Наблюдения.Я не могу использовать flatMap, потому что подписки могут вызываться из разных мест, и эти Observables не связаны друг с другом.Чтобы быть конкретным: мой CollectionView загружает больше контента с сервера и через 2 секунды после этого пользователь нажимает кнопку «Отправить комментарий», пока CollectionView все еще загружает свой пакет.Поэтому я хочу подождать, пока обновление CollectionView завершится, и только после этого выполнить запрос на публикацию моего комментария.Я создал класс с именем ObservableQueue, и он работает просто отлично.Но мне нужно знать, есть ли такие проблемы, как утечки памяти, мертвые блокировки или, может быть, я просто что-то упустил.Вот оно:
extension CompositeDisposable {
@discardableResult
func insert(disposeAction: @escaping () -> ()) -> DisposeKey? {
return insert(Disposables.create(with: disposeAction))
}
}
class ObservableQueue {
private let lock = NSRecursiveLock()
private let relay = BehaviorRelay(value: 0)
private let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "ObservableQueue.scheduler")
func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
return Observable.create({ observer -> Disposable in
let disposable = CompositeDisposable()
let relayDisposable = self
.relay
.observeOn(self.scheduler)
.filter({ value -> Bool in
if value > 0 {
return false
}
self.lock.lock(); defer { self.lock.unlock() }
if self.relay.value > 0 {
return false
}
self.relay.accept(self.relay.value + 1)
disposable.insert {
self.lock.lock(); defer { self.lock.unlock() }
self.relay.accept(self.relay.value - 1)
}
return true
})
.take(1)
.flatMapLatest { _ in observable }
.subscribe { observer.on($0) }
_ = disposable.insert(relayDisposable)
return disposable
})
}
}
И тогда я могу использовать это так:
let queue = ObservableQueue()
...
// first observable
let observable1 = Observable
.just(0)
.delay(5, scheduler: MainScheduler.instance)
queue
.enqueue(observable1)
.subscribe(onNext: { _ in
print("here1")
})
.disposed(by: rx.disposeBag)
// second observable
let observable2 = Observable
.just(0)
.delay(5, scheduler: MainScheduler.instance)
queue
.enqueue(observable2)
.subscribe(onNext: { _ in
print("here2")
})
.disposed(by: rx.disposeBag)
// third observable
let observable3 = Observable
.just(0)
.delay(5, scheduler: MainScheduler.instance)
queue
.enqueue(observable3)
.subscribe(onNext: { _ in
print("here3")
})
.disposed(by: rx.disposeBag)