Как применить оператор Combine только после получения первого сообщения? - PullRequest
1 голос
/ 19 февраля 2020

В Combine, используя только встроенные операторы, есть ли способ пропустить оператор для первого значения, но затем применить этот оператор для всех последующих значений?

Примите во внимание следующее:

publisher
  .debounce(...)
  .sink(...)

В этом случае debounce будет ожидать истечения указанного времени ожидания, прежде чем передать значение в sink. Однако во многих случаях вы хотите, чтобы debounce запускался только после первого элемента. Например, если пользователь пытается отфильтровать список контактов, вполне возможно, что они вводят только одну букву в текстовое поле. Если это так, приложение, вероятно, должно начать фильтровать немедленно, не дожидаясь истечения времени ожидания debounce.

Я знаю о Drop издателях, но не могу найти их комбинация, которая будет выполнять больше операций «пропуска», так что sink получает каждое значение, но debounce игнорируется для первого значения.

Что-то вроде следующего:

publisher
  .if_first_element_passthrough_to_sink(...), else_debounce(...)
  .sink(...)

Возможно ли что-то подобное со встроенными операторами?

Разъяснение

Некоторые уточнения, поскольку мое первоначальное сообщение не было таким четким, как следовало бы ... Ответ, предоставленный Аспери ниже, очень близок, но в идеале первый элемент в последовательности всегда доставляется, затем запускается debounce.

Представьте, что пользователь вводит следующее:

AB C ... (пауза печатать на несколько секунд) ... D ... (пауза) ... EFG

То, что я хотел бы, это:

  • A, D и E доставляются немедленно.
  • B C объединяется в C с использованием debounce
  • F G объединяется в G с использованием debounce

Ответы [ 2 ]

1 голос
/ 19 февраля 2020

В вашем конкретном случае debounce вы можете предпочесть поведение throttle. Он отправляет первый элемент немедленно, а затем отправляет не более одного элемента на interval.

В любом случае, может сделать это с помощью встроенных объединений? Да, с некоторыми трудностями. Должен ты? Может быть ...

Вот мраморная диаграмма вашей цели:

marble diagram of modified debounce operator

Каждый раз, когда значение входит в кенни c -разрушитель , он запускает таймер (представленный заштрихованной областью). Если значение прибывает во время работы таймера, kenny c -debouncer сохраняет значение и перезапускает таймер. Когда время таймера истекает, и если какие-либо значения поступили во время его работы, kenny c -debouncer немедленно выдает последнее значение.

Оператор scan позволяет нам сохранять состояние, которое мы изменяем каждый раз, когда ввод поступает. Нам нужно отправить два вида входных данных в scan: выходные данные вышестоящего издателя и срабатывание таймера. Итак, давайте определим тип для этих входных данных:

fileprivate enum DebounceEvent<Value> {
    case value(Value)
    case timerFired
}

Какое состояние нам нужно внутри нашего scan преобразования? Нам определенно нужны планировщик, интервал и опции планировщика, чтобы мы могли устанавливать таймеры.

Нам также нужен PassthroughSubject, который мы можем использовать, чтобы превратить срабатывание таймера во входные данные для оператора scan.

Мы не можем фактически отменить и перезапустить таймер, поэтому вместо этого, когда таймер сработает, мы посмотрим, должен ли он был перезапущен. Если это так, мы запустим еще один таймер. Поэтому нам нужно знать, работает ли таймер и какие выходные данные отправлять при его срабатывании, а также время перезапуска таймера, если необходим перезапуск.

Поскольку вывод scan представляет собой все состояние значение, нам также нужно, чтобы состояние включало выходное значение для отправки в нисходящем направлении, если оно есть.

Вот тип состояния:

fileprivate struct DebounceState<Value, S: Scheduler> {
    let scheduler: S
    let interval: S.SchedulerTimeType.Stride
    let options: S.SchedulerOptions?

    let subject = PassthroughSubject<Void, Never>()

    enum TimerState {
        case notRunning
        case running(PendingOutput?)

        struct PendingOutput {
            var value: Value
            var earliestDeliveryTime: S.SchedulerTimeType
        }
    }

    var output: Value? = nil
    var timerState: TimerState = .notRunning
}

Теперь давайте посмотрим, как на самом деле использовать scan с некоторыми другими операторами для реализации kenny c версии debounce:

extension Publisher {
    func kennycDebounce<S: Scheduler>(
        for dueTime: S.SchedulerTimeType.Stride,
        scheduler: S,
        options: S.SchedulerOptions? = nil
    ) -> AnyPublisher<Output, Failure>
    {
        let initialState = DebounceState<Output, S>(
            scheduler: scheduler,
            interval: dueTime,
            options: options)
        let timerEvents = initialState.subject
            .map { _ in DebounceEvent<Output>.timerFired }
            .setFailureType(to: Failure.self)
        return self
            .map { DebounceEvent.value($0) }
            .merge(with: timerEvents)
            .scan(initialState) { $0.updated(with: $1) }
            .compactMap { $0.output }
            .eraseToAnyPublisher()
    }
}

Мы начнем с построения начального состояния для оператора scan.

Затем мы создадим издателя который превращает Void выходы состояния PassthroughSubject в события .timerFired.

Наконец, мы строим наш полный конвейер, который имеет четыре этапа:

  1. Превратите выходные сигналы (от self) в .value события.

  2. Объедините события значения с событиями таймера.

  3. Использование scan для обновления состояния отладки со значением и событиями таймера , Фактическая работа выполняется с помощью метода updated(with:), который мы добавим к DebounceState ниже.

  4. Отображение полного состояния вниз только на значение, которое мы хотим передать вниз по потоку, и отбрасывание NULL (которые происходят, когда события восходящего потока подавляются при отладке).

Осталось только написать метод updated(with:). Он просматривает тип каждого входящего события (value или timerFired) и состояние таймера, чтобы определить, каким должно быть новое состояние, и, при необходимости, установить новый таймер.

extension DebounceState {
    func updated(with event: DebounceEvent<Value>) -> DebounceState<Value, S> {
        var answer = self
        switch (event, timerState) {
        case (.value(let value), .notRunning):
            answer.output = value
            answer.timerState = .running(nil)
            scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: options) { [subject] in subject.send() }
        case (.value(let value), .running(_)):
            answer.output = nil
            answer.timerState = .running(.init(value: value, earliestDeliveryTime: scheduler.now.advanced(by: interval)))
        case (.timerFired, .running(nil)):
            answer.output = nil
            answer.timerState = .notRunning
        case (.timerFired, .running(.some(let pendingOutput))):
            let now = scheduler.now
            if pendingOutput.earliestDeliveryTime <= now {
                answer.output = pendingOutput.value
                answer.timerState = .notRunning
            } else {
                answer.output = nil
                scheduler.schedule(after: pendingOutput.earliestDeliveryTime, tolerance: .zero, options: options) { [subject] in subject.send() }
            }
        case (.timerFired, .notRunning):
            // Impossible!
            answer.output = nil
        }
        return answer
    }
}

это работает? Давайте проверим это:

import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true

let subject = PassthroughSubject<String, Never>()
let q = DispatchQueue.main
let start = DispatchTime.now()
let cfStart = CFAbsoluteTimeGetCurrent()
q.asyncAfter(deadline: start + .milliseconds(100)) { subject.send("A") }
// A should be delivered at start + 100ms.
q.asyncAfter(deadline: start + .milliseconds(200)) { subject.send("B") }
q.asyncAfter(deadline: start + .milliseconds(300)) { subject.send("C") }
// C should be delivered at start + 800ms.
q.asyncAfter(deadline: start + .milliseconds(1100)) { subject.send("D") }
// D should be delivered at start + 1100ms.
q.asyncAfter(deadline: start + .milliseconds(1800)) { subject.send("E") }
// E should be delivered at start + 1800ms.
q.asyncAfter(deadline: start + .milliseconds(1900)) { subject.send("F") }
q.asyncAfter(deadline: start + .milliseconds(2000)) { subject.send("G") }
// G should be delivered at start + 2500ms.

let ticket = subject
    .kennycDebounce(for: .milliseconds(500), scheduler: q)
    .sink {
        print("\($0) \(((CFAbsoluteTimeGetCurrent() - cfStart) * 1000).rounded())") }

Вывод:

A 107.0
C 847.0
D 1167.0
E 1915.0
G 2714.0

Я не уверен, почему последующие события так задерживаются. Это могут быть просто побочные эффекты для детской площадки.

1 голос
/ 19 февраля 2020

Если я правильно понял ваши потребности, это может быть достигнуто на основе Concatenate следующим образом (в псевдокоде):

let originalPublisher = ...
let publisher = Publishers.Concatenate(
        prefix: originalPublisher.first(),
        suffix: originalPublisher.debounce(for: 0.5, scheduler: RunLoop.main))
    .eraseToAnyPublisher()

, поэтому префикс просто отправляет первым элемент ниже исходного издателя и завершен, затем суффикс просто пропустите все последующие элементы, используя debounce.

...