RxSwift несколько наблюдаемых на карте - PullRequest
2 голосов
/ 15 апреля 2019

Я столкнулся с ситуацией, когда мне нужно было получить API, который будет генерировать данные json зарегистрированных пользователей. Затем я должен был бы пройти через каждого пользователя и получить его аватар с удаленного URL-адреса и сохранить его на диск. Я могу выполнить это второе задание внутри subscribe, но это не лучшая практика. Я пытаюсь реализовать это с map, flatMap и т. Д.

Вот мой пример кода:

self.dataManager.getUsers()
            .observeOn(MainScheduler.instance)
            .subscribeOn(globalScheduler)
            .map{ [unowned self] (data) -> Users in
                var users = data
// other code for manipulating users goes here
// then below I am trying to use another loop to fetch their avatars

                if let cats = users.categories {
                    for cat in cats  {
                        if let profiles = cat.profiles {
                            for profile in profiles {
                                if let thumbnail = profile.thumbnail,
                                    let url = URL(string: thumbnail) {
                                    URLSession.shared.rx.response(request: URLRequest(url: url))
                                        .subscribeOn(MainScheduler.instance)
                                        .subscribe(onNext: { response in
                                            // Update Image
                                            if let img = UIImage(data: response.data) {
                                                try? Disk.save(img, to: .caches, as: url.lastPathComponent)
                                            }
                                        }, onError: { (error) in

                                        }).disposed(by: self.disposeBag)
                                }
                            }
                        }
                    }
                }

                return users
            }
            .subscribe(onSuccess: { [weak self] (users) in

            }).disposed(by: disposeBag)

В этом коде 2 проблемы. Первый - это rx on URLSession, который выполняет задачу в фоновом режиме в другом потоке, и нет способа подтвердить основной subscribe назад, когда эта операция завершится. Второй - с циклом и rx, который неэффективен, так как он должен генерировать несколько наблюдаемых и затем обрабатывать их.

Любая идея по улучшению этой логики приветствуется.

1 Ответ

3 голосов
/ 17 апреля 2019

Это была забавная головоломка.

«Специальный соус», который решает проблему, находится в этой строке:

.flatMap { 
    Observable.combineLatest($0.map { 
        Observable.combineLatest(
            Observable.just($0.0), 
            URLSession.shared.rx.data(request: $0.1)
                .materialize()
        ) 
    }) 
}

map перед строкой создает Observable<[(URL, URLRequest)]>, и рассматриваемая строка преобразует его в Observable<[(URL, Event<Data>)]>.

Строка делает это следующим образом:

  1. Настройка сетевого вызова для создания Observable<Data>
  2. Материализуйте его, чтобы создать Observable<Event<Data>> (это сделано для того, чтобы ошибка в одной загрузке не закрывала весь поток.) ​​
  3. Поднимите URL обратно в Observable, что дает нам Observable<URL>
  4. Объедините наблюдаемые из шагов 2 и 3, чтобы получить Observable<(URL, Event<Data>)>.
  5. Отобразить каждый элемент массива для получения [Observable<(URL, Event<Data>)>]
  6. Объедините наблюдаемые в этом массиве, чтобы в итоге получить Observable<[(URL, Event<Data>)]>

Вот код

// manipulatedUsers is for the code you commented out.
// users: Observable<Users>
let users = self.dataManager.getUsers()
    .map(manipulatedUsers) // manipulatedUsers(_ users: Users) -> Users
    .asObservable()
    .share(replay: 1)

// this chain is for handling the users object. You left it blank in your code so I did too.
users
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { users in

    })
    .disposed(by: disposeBag)

// This navigates through the users structure and downloads the images.
// images: Observable<(URL, Event<Data>)>
let images = users.map { $0.categories ?? [] }
    .map { $0.flatMap { $0.profiles ?? [] } }
    .map { $0.compactMap { $0.thumbnail } }
    .map { $0.compactMap { URL(string: $0) } }
    .map { $0.map { ($0, URLRequest(url: $0)) } }
    .flatMap { 
        Observable.combineLatest($0.map { 
            Observable.combineLatest(
                Observable.just($0.0), 
                URLSession.shared.rx.data(request: $0.1)
                    .materialize()
            ) 
        }) 
    }
    .flatMap { Observable.from($0) }
    .share(replay: 1)

// this chain filters out the errors and saves the successful downloads.
images
    .filter { $0.1.element != nil }
    .map { ($0.0, $0.1.element!) }
    .map { ($0.0, UIImage(data: $0.1)!) }
    .observeOn(MainScheduler.instance)
    .bind(onNext: { url, image in
        try? Disk.save(image, to: .caches, as: url.lastPathComponent)
        return // need two lines here because this needs to return Void, not Void?
    })
    .disposed(by: disposeBag)

// this chain handles the download errors if you want to.
images
    .filter { $0.1.error != nil }
    .bind(onNext: { url, error in
        print("failed to download \(url) because of \(error)")
    })
    .disposed(by: disposeBag)
...