Swift Combine Chunk Operator - PullRequest
       18

Swift Combine Chunk Operator

3 голосов
/ 17 апреля 2020

Я пытаюсь создать фрагменты потока в платформе Combine от Apple.

Я собираюсь сделать что-то вроде этого:

Stream a:
--1-2-3-----4-5--->

Stream b:
--------0-------0->

a.chunk(whenOutputFrom: b)

-------[1, 2, 3]---[4, 5]-->

Может ли это быть реализовано в Combine?

1 Ответ

1 голос
/ 17 апреля 2020

Вы ищете оператор buffer в мире ReactiveX.

В Combine нет встроенного оператора buffer (в смысле ReactiveX). Кажется, встроенный buffer больше похож на bufferCount в ReactiveX.

Я нашел этот ответ от Daniel T, который воссоздает оператор buffer в RxSwift, а также эта таблица , в которой рассказывается, как перенести RxSwift на Combine.

Однако в ответе Даниэля Т используется Observable.create, который недоступен в Combine. Я посмотрел немного глубже и нашел этот другой ответ , который воссоздает Observable.create в Combine.

Объединяя три вещи, которые я нашел (без каламбура), это то, что я придумал:

// -------------------------------------------------
// from https://stackoverflow.com/a/61035663/5133585
struct AnyObserver<Output, Failure: Error> {
    let onNext: ((Output) -> Void)
    let onError: ((Failure) -> Void)
    let onCompleted: (() -> Void)
}

struct Disposable {
    let dispose: () -> Void
}

extension AnyPublisher {
    static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
        let subject = PassthroughSubject<Output, Failure>()
        var disposable: Disposable?
        return subject
            .handleEvents(receiveSubscription: { subscription in
                disposable = subscribe(AnyObserver(
                    onNext: { output in subject.send(output) },
                    onError: { failure in subject.send(completion: .failure(failure)) },
                    onCompleted: { subject.send(completion: .finished) }
                ))
            }, receiveCancel: { disposable?.dispose() })
            .eraseToAnyPublisher()
    }
}
// -------------------------------------------------  

// -------------------------------------------------
// adapted from https://stackoverflow.com/a/43413167/5133585
extension Publisher {

    /// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
    func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
        return AnyPublisher.create { observer in
            var buffer: [Output] = []
            let lock = NSRecursiveLock()
            let boundaryDisposable = boundary.sink(receiveCompletion: {
                _ in
            }, receiveValue: {_ in
                lock.lock(); defer { lock.unlock() }
                observer.onNext(buffer)
                buffer = []
            })
            let disposable = self.sink(receiveCompletion: { (event) in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .finished:
                    observer.onNext(buffer)
                    observer.onCompleted()
                case .failure(let error):
                    observer.onError(error)
                    buffer = []
                }
            }) { (element) in
                lock.lock(); defer { lock.unlock() }
                buffer.append(element)
            }
            return Disposable {
                disposable.cancel()
                boundaryDisposable.cancel()
            }
        }
    }
}
// -------------------------------------------------
...