RxSwift Пропустить события, пока не закончится собственная последовательность - PullRequest
0 голосов
/ 25 декабря 2018

У меня есть одна наблюдаемая (мы будем называть это триггером), которая может излучать много раз за короткий промежуток времени.Когда он выдает, я выполняю сетевой запрос, и я сохраняю результат с помощью оператора сканирования.

Моя проблема заключается в том, что я хотел бы дождаться завершения запроса, чтобы выполнить его снова.(Но так как сейчас, если триггер испускает 2 наблюдаемые, не имеет значения, завершился ли fetchData, он сделает это снова)

Бонус: я также хотел бы брать только первое каждые X секунд (Дебодинг не является решением, потому что он может излучать все время, и я хочу получать 1 каждые X секунд, это не газ и не потому, что если наблюдаемое излучает очень быстро 2 раза, я получу первую и вторую задержанные Х секунд)

Код:

trigger.flatMap { [unowned self] _ in
        self.fetchData()
        }.scan([], accumulator: { lastValue, newValue in
        return lastValue + newValue
    })

и fetchData:

func fetchData() -> Observable<[ReusableCellVMContainer]>

Триггер:

let trigger = Observable.of(input.viewIsLoaded, handle(input.isNearBottomEdge)).merge() 

1 Ответ

0 голосов
/ 25 декабря 2018

Извините, я неправильно понял, что вы пытались выполнить, в моем ответе ниже.

Оператор, который добьется того, что вы хотите, - flatMapFirst.Это будет игнорировать события от триггера до тех пор, пока fetchData() не будет завершен.

trigger
    .flatMapFirst { [unowned self] _ in
        self.fetchData()
    }
    .scan([], accumulator: { lastValue, newValue in
        return lastValue + newValue
    })

Я оставляю свой предыдущий ответ ниже на случай, если это поможет (если что-нибудь, у него есть «бонусный» ответ).


Проблема, которую вы испытываете, называется «обратным давлением», когда наблюдаемое производит значения быстрее, чем может справиться наблюдатель.

В этом конкретном случае я рекомендую вамне ограничивать запросы на выборку данных и вместо этого сопоставлять каждый запрос с ключом, а затем выдавать массив в следующем порядке:

trigger
    .enumerated()
    .flatMap { [unowned self] count, _ in
        Observable.combineLatest(Observable.just(count), self.fetchData())
    }
    .scan(into: [Int: Value](), accumulator: { lastValue, newValue in
        lastValue[newValue.0] = newValue.1
    })
    .map { $0.sorted(by: { $0.key < $1.key }).map { $0.value }}

Для того, чтобы вышеуказанное работало, вам необходимо:

extension ObservableType {
    func enumerated() -> Observable<(Int, E)> {
        let shared = share()
        let counter = shared.scan(0, accumulator: { prev, _ in return prev + 1 })
        return Observable.zip(counter, shared)
    }
}

Таким образом, ваши сетевые запросы запускаются как можно скорее, но вы не теряете порядок, в котором они сделаны.


Для вашего "бонуса" оператор buffer будет делать именно то, что вы хотите,Что-то вроде:

trigger.buffer(timeSpan: seconds, count: Int.max, scheduler: MainScheduler.instance)
    .map { $0.first }
...