Планирование подписок в RxSwift - PullRequest
0 голосов
/ 10 января 2019

Я пытаюсь запланировать серию загрузок изображений. Я хочу вызывать scheduleImagesDownload столько раз, сколько захочу, но дождусь выполнения кода внутри downloadImages только после завершения предыдущего вызова.

Я пытаюсь получить и загрузить изображения с камеры для определенного идентификатора, как только все изображения будут загружены для этого идентификатора, я хочу начать загрузку изображений для следующего идентификатора и т. Д.

У меня трудные времена, потому что даже при использовании Serial Scheduler все подписки вызываются сразу, до завершения предыдущей загрузки. Мне было интересно, есть ли способ сделать это с чистым Rx, без использования семафоров и т. Д.

Заранее спасибо!


    func scheduleImagesDownload(flightId: String) -> Disposable {
        let subscription = donwloadImages(flightId)
            .subscribeOn(SerialDispatchQueueScheduler(qos: .background))
            .subscribe(onCompleted: {
                /// Finished downloading images for flight.
            }, onError: { error in
                /// Error downloading images for flight.
            })

        return subscription
    }

    func donwloadImages(_ flightId: String) -> Completable {
        return Completable.create { completable in

            /// Simulate querying the drone for images to download async and start downloading them.
            DispatchQueue.global().async {
                sleep(5) // Sleep to simulate downloading time.
                completable(.completed) // Finished downloading.
            }
            return Disposables.create()
        }
    }

Ответы [ 2 ]

0 голосов
/ 10 января 2019

Ключевым оператором для цепочки каждой операции загрузки изображения в очередь является ConcatMap. Я написал следующий фрагмент кода на основе ваших требований. Фрагмент говорит сам за себя.

let flightIds: [String] = [] // Array holding flightIds
let disposeBag = DisposeBag()

func download() {
    Observable.from(flightIds) // Convert array of flightIds into Observable chain
        .flatMap(getImageURLs) // For each flightId, get array of image URLs to be downloaded
        .flatMap(convertToImageURLObservable) // Convert array of image URLs into Observable chain
        .concatMap(downloadImage) // Concate each url in observable chain so each image will be downloaded sequencially
        .subscribeOn(SerialDispatchQueueScheduler(qos: .background)) // Scheduled entire chain on background queue
        .subscribe()
        .disposed(by: disposeBag)
}

/// Fetches image URLs for given flightId
func getImageURLs(_ flightId: String) -> Single<[URL]> {
    return Single<[URL]>.create { single in

        /// fetch & pass URLs in below array inside .success
        single(.success([]))

        return Disposables.create()
    }
}

/// Convert array of image URLs into Observable chain
func convertToImageURLObservable(_ urls: [URL]) -> Observable<URL> {
    return Observable.from(urls)
}

/// Downloads image for given URL
func downloadImage(_ url: URL) -> Completable {
    return Completable.create { completable in

        /// fetch image
        DispatchQueue.global().async {
            sleep(5) // Sleep to simulate downloading time.
            completable(.completed) // Finished downloading.
        }

        return Disposables.create()
    }
}
0 голосов
/ 10 января 2019

Я верю, что это то, что вы хотите. Вы в основном создаете массив / набор всех идентификаторов, которые вы получаете с камеры. Затем при каждом завершении вы удаляете элемент из своей коллекции и затем перезапускаете процесс, пока ваша коллекция не станет пустой.

    var cameraIds: Set<String>()

    func scheduleImagesDownload(flightId: String) -> Disposable {
        let subscription = donwloadImages(flightId)
            .subscribeOn(SerialDispatchQueueScheduler(qos: .background))
            .subscribe(onCompleted: {
                if let nextFlightId = cameraIds.first {
                  cameraIds.removeFirst()
                  scheduleImagesDownload(flightId: nextFlightId)
               } else {
                 // Finished downloads for all cameraIds and images
               }
            }, onError: { error in
                /// Error downloading images for flight.
            })

        return subscription
    }
...