RXswift, повторить наблюдаемый до завершения - PullRequest
0 голосов
/ 05 июня 2018

Я пытаюсь загрузить прошивку на устройство BLE.Поэтому я нажимаю некоторые данные, жду подтверждения, а затем нажимаю еще, и так далее.Но я борюсь с повторением наблюдателя до тех пор, пока наблюдатель не завершит его.

Это наблюдатель, поэтому, если больше нет отправляемых пакетов, он должен завершиться и прекратить повторение.

let transferImage = Observable<Void>.create { (obs) -> Disposable in
        guard let nextPacket = dataSendingTracker.getNextPacket() else {
            obs.onCompleted()
            return Disposables.create()
        }

        return self.instrument()
            .flatMap{ $0.sendFWpacket(packet: nextPacket) }
            .subscribe(onNext: { () in
                obs.onNext(())
            }, onError: { (error) in
                obs.onError(error)
            })
    }

Любое предложение о том, как мне этого добиться?

Ответы [ 2 ]

0 голосов
/ 05 июня 2018

Предполагая, что у вас есть функция, которая записывает блок данных, как эта тестовая функция:

func writeDataBlock(offset: Int, blockSize: Int) -> Observable<Int> {
    let writtenBytesCount = min(Int(arc4random_uniform(5) + 5), blockSize)
    return Observable<Int>.just(writtenBytesCount)
}

В действительности эта функция также использует некоторый буфер данных и пытается выдвинуть блок заданногоразмер из этих данных с заданным смещением, и возвращает количество байтов, записанных, когда сделано.Здесь вы можете использовать свою логику из transferImage.

. Тогда вся передаточная функция может быть написана с использованием рекурсии следующим образом:

func writeAllDataRec(offset: Int, totalSize: Int, observer: AnyObserver<String>) {
    guard offset < totalSize else {
        observer.onNext("writeAllData completed")
        observer.onCompleted()
        return
    }

    var subscriber: Disposable?
    let blockSize = min(10, totalSize - offset)
    subscriber = writeDataBlock(offset: offset, blockSize: blockSize).subscribe { ev in
        switch ev {
        case let .next(writtenBytesCount):
            debugPrint("writeNextBlock from offset: \(offset); writtenBytesCount = \(writtenBytesCount)")
            let newOffset = offset + writtenBytesCount
            writeAllDataRec(offset: newOffset, totalSize: totalSize, observer: observer)
        case .completed:
            subscriber?.dispose()
            //debugPrint("writeAllData block completed")
        case let .error(err):
            subscriber?.dispose()
            observer.onError(err)
            observer.onCompleted()
        }
    }
}

func writeAllData(totalSize: Int) -> Observable<String> {
    return Observable<String>.create { (observer) -> Disposable in
        writeAllDataRec(offset: 0, totalSize: totalSize, observer: observer)
        return Disposables.create()
    }
}

Это можно проверить следующим образом:

var subscriber: Disposable?
...
    self.subscriber = writeAllData(totalSize: 100).subscribe(onNext: { (message) in
        print("\(message)")
    })

В этом решении я предполагаю, что ваш следующий блок данных зависит от того, сколько данных было записано ранее, и что нет общего глобального состояния.

Это можно упростить слогика RxJS расширяется , но, к сожалению, эта функция отсутствует в RxSwift (4.1.2).

0 голосов
/ 05 июня 2018

Попробуйте что-то вроде этого, но я считаю это уродливым ... не Rx Решение

TransferImage.takeWhile остановится, когда getNextPacket вернет nil или в этом случае Integer будет меньше нуля ..

     func test() {
    let transferImage = Observable<Int>.create { (obs) -> Disposable in
        if let nextPacket = self.getNextPacket() {
            obs.onNext(nextPacket)

        } else{
            obs.onNext(-1)
            obs.onCompleted()

        }
        return Disposables.create()
    }

    transferImage.takeWhile{$0 > 0}.subscribe(onNext: { nextPacket in
        print(nextPacket)
        let completed = self.sendFWpacket()
        if !completed {
            self.test()
        }

    }, onError: { error in
        print(error)
    }, onCompleted: {
        print("onCompleted")
    }) {
        print("disposed")
        }.disposed(by: disposeBag)

}

func sendFWpacket()-> Bool {
    return false
}

func getNextPacket() ->  Int? {
    return 1
}
...