Вы ищете оператор buffer
в мире ReactiveX.
В Combine нет встроенного оператора buffer
(в смысле ReactiveX). Кажется, встроенный buffer
больше похож на bufferCount
в ReactiveX.
Я нашел этот ответ от Daniel T, который воссоздает оператор buffer
в RxSwift, а также эта таблица , в которой рассказывается, как перенести RxSwift на Combine.
Однако в ответе Даниэля Т используется Observable.create
, который недоступен в Combine. Я посмотрел немного глубже и нашел этот другой ответ , который воссоздает Observable.create
в Combine.
Объединяя три вещи, которые я нашел (без каламбура), это то, что я придумал:
// -------------------------------------------------
// from https://stackoverflow.com/a/61035663/5133585
struct AnyObserver<Output, Failure: Error> {
let onNext: ((Output) -> Void)
let onError: ((Failure) -> Void)
let onCompleted: (() -> Void)
}
struct Disposable {
let dispose: () -> Void
}
extension AnyPublisher {
static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
let subject = PassthroughSubject<Output, Failure>()
var disposable: Disposable?
return subject
.handleEvents(receiveSubscription: { subscription in
disposable = subscribe(AnyObserver(
onNext: { output in subject.send(output) },
onError: { failure in subject.send(completion: .failure(failure)) },
onCompleted: { subject.send(completion: .finished) }
))
}, receiveCancel: { disposable?.dispose() })
.eraseToAnyPublisher()
}
}
// -------------------------------------------------
// -------------------------------------------------
// adapted from https://stackoverflow.com/a/43413167/5133585
extension Publisher {
/// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
return AnyPublisher.create { observer in
var buffer: [Output] = []
let lock = NSRecursiveLock()
let boundaryDisposable = boundary.sink(receiveCompletion: {
_ in
}, receiveValue: {_ in
lock.lock(); defer { lock.unlock() }
observer.onNext(buffer)
buffer = []
})
let disposable = self.sink(receiveCompletion: { (event) in
lock.lock(); defer { lock.unlock() }
switch event {
case .finished:
observer.onNext(buffer)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
buffer = []
}
}) { (element) in
lock.lock(); defer { lock.unlock() }
buffer.append(element)
}
return Disposable {
disposable.cancel()
boundaryDisposable.cancel()
}
}
}
}
// -------------------------------------------------