Раздвижные окна в RxSwift - PullRequest
       30

Раздвижные окна в RxSwift

0 голосов
/ 17 декабря 2018

Исходя из фона RxJava, я не могу придумать стандартный подход для реализации скользящих окон в RxSwift.Например, у меня есть следующая последовательность событий:

1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...

Давайте представим, что передача события происходит дважды в секунду.То, что я хочу сделать, это преобразовать эту последовательность в последовательность буферов, каждый из которых содержит последние три секунды данных.Кроме того, каждый буфер должен выводиться один раз в секунду.Таким образом, результат будет выглядеть следующим образом:

[1,2,3,4,5,6], [3,4,5,6,7,8], [5,6,7,8,9,10], ...

Что бы я делал в RxJava, я бы использовал одну из перегрузок метода buffer следующим образом:

stream.buffer(3000, 1000, TimeUnit.MILLISECONDS)

Какойприводит именно к результату, который мне нужно достичь: последовательность буферов, каждый буфер генерируется один раз в секунду и содержит последние три секунды данных.

Я проверял документы RxSwift далеко и широко, и я не нашел никаких перегрузок оператора buffer, которые позволили бы мне это сделать.Я упускаю какой-то неочевидный (для пользователя RxJava, ofc) оператор?

Ответы [ 2 ]

0 голосов
/ 17 декабря 2018

Я изначально написал решение, используя пользовательский оператор.С тех пор я выяснил, как это можно сделать с помощью стандартных операторов.

extension ObservableType {

    func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
        let trigger = Observable<Int>.timer(timeSpan, period: timeShift, scheduler: scheduler)
            .takeUntil(self.takeLast(1))

        let buffer = self
            .scan([Date: E]()) { previous, current in
                var next = previous
                let now = scheduler.now
                next[now] = current
                return next.filter { $0.key > now.addingTimeInterval(-timeSpan) }
        }

        return trigger.withLatestFrom(buffer)
            .map { $0.sorted(by: { $0.key <= $1.key }).map { $0.value } }
    }
}

Я оставляю свое исходное решение ниже для потомков:


Написание собственного оператора - эторешение здесь.

extension ObservableType {

    func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
        return Observable.create { observer in
            var buf: [Date: E] = [:]
            let lock = NSRecursiveLock()
            let elementDispoable = self.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case let .next(element):
                    buf[Date()] = element
                case .completed:
                    observer.onCompleted()
                case let .error(error):
                    observer.onError(error)
                }
            }
            let spanDisposable = scheduler.schedulePeriodic((), startAfter: timeSpan, period: timeShift, action: { state in
                lock.lock(); defer { lock.unlock() }
                let now = Date()
                buf = buf.filter { $0.key > now.addingTimeInterval(-timeSpan) }
                observer.onNext(buf.sorted(by: { $0.key <= $1.key }).map { $0.value })
            })
            return Disposables.create([spanDisposable, elementDispoable])
        }
    }
}
0 голосов
/ 17 декабря 2018

После некоторых проб и ошибок я нашел следующее решение.Я не проверял, хотя, это всего лишь общая идея (прошу прощения за код, это мой второй день написания на Swift):

stream
    // split into a sequence of buffers where each buffer 
    // contains the data obtained during the last second  
    .buffer(timeSpan: RxTimeInterval(1), count: Int.max, scheduler: MainScheduler.instance)

    // put it into a temporary seed array; if array's length is 3 
    // then it contains last three seconds of data already
    // so we need to drop first second of data
    .scan([[AccelerometerReading]]()) accumulator: { (seed, lastSecond) -> [[AccelerometerReading]] in
        var mutable = seed
        if seed.count == 3 {
            mutable.remove(at: 0)
        }
        mutable.append(lastSecond)
        return mutable
    }

    // skip the first two windows as they're too short:
    // first one contains first second of data, second contains
    // first and second seconds of data
    .skip(2)

    // flatten a seed into one dimensional array
    .map { (window: [[AccelerometerReading]]) -> [AccelerometerReading] in
        window.flatMap { $0 }
    }
...