Предполагая, что у вас есть функция, которая записывает блок данных, как эта тестовая функция:
func writeDataBlock(offset: Int, blockSize: Int) -> Observable<Int> {
let writtenBytesCount = min(Int(arc4random_uniform(5) + 5), blockSize)
return Observable<Int>.just(writtenBytesCount)
}
В действительности эта функция также использует некоторый буфер данных и пытается выдвинуть блок заданногоразмер из этих данных с заданным смещением, и возвращает количество байтов, записанных, когда сделано.Здесь вы можете использовать свою логику из transferImage
.
. Тогда вся передаточная функция может быть написана с использованием рекурсии следующим образом:
func writeAllDataRec(offset: Int, totalSize: Int, observer: AnyObserver<String>) {
guard offset < totalSize else {
observer.onNext("writeAllData completed")
observer.onCompleted()
return
}
var subscriber: Disposable?
let blockSize = min(10, totalSize - offset)
subscriber = writeDataBlock(offset: offset, blockSize: blockSize).subscribe { ev in
switch ev {
case let .next(writtenBytesCount):
debugPrint("writeNextBlock from offset: \(offset); writtenBytesCount = \(writtenBytesCount)")
let newOffset = offset + writtenBytesCount
writeAllDataRec(offset: newOffset, totalSize: totalSize, observer: observer)
case .completed:
subscriber?.dispose()
//debugPrint("writeAllData block completed")
case let .error(err):
subscriber?.dispose()
observer.onError(err)
observer.onCompleted()
}
}
}
func writeAllData(totalSize: Int) -> Observable<String> {
return Observable<String>.create { (observer) -> Disposable in
writeAllDataRec(offset: 0, totalSize: totalSize, observer: observer)
return Disposables.create()
}
}
Это можно проверить следующим образом:
var subscriber: Disposable?
...
self.subscriber = writeAllData(totalSize: 100).subscribe(onNext: { (message) in
print("\(message)")
})
В этом решении я предполагаю, что ваш следующий блок данных зависит от того, сколько данных было записано ранее, и что нет общего глобального состояния.
Это можно упростить слогика RxJS расширяется , но, к сожалению, эта функция отсутствует в RxSwift (4.1.2).