Как обрабатывать массив задач асинхронно с быстрым объединением - PullRequest
0 голосов
/ 02 ноября 2019

У меня есть издатель, который принимает сетевой вызов и возвращает массив идентификаторов. Теперь мне нужно вызвать другой сетевой вызов для каждого идентификатора, чтобы получить все мои данные. И я хочу, чтобы конечный издатель имел получившийся объект.

Первый сетевой результат:

"user": {
   "id": 0,
   "items": [1, 2, 3, 4, 5]
}

Конечный объект:

struct User {
    let id: Int
    let items: [Item]
    ... other fields ...
}
struct Item {
    let id: Int
    ... other fields ...
}

Обработка нескольких сетевых вызовов:

userPublisher.flatMap { user in
    let itemIDs = user.items
    return Future<[Item], Never>() { fulfill in
        ... OperationQueue of network requests ...
    }
}

Я бы хотел выполнять сетевые запросы параллельно, так как они не зависят друг от друга. Я не уверен, что Future здесь, но я думаю, что тогда у меня будет код для выполнения DispatchGroup или OperationQueue и выполнения после того, как все они будут готовы. Есть ли еще способ объединения?

У Doe Combine есть концепция разделения одного потока на множество параллельных потоков и объединения потоков вместе?

1 Ответ

0 голосов
/ 05 ноября 2019

Объединение предлагает расширения около URLSession для обработки сетевых запросов, если только вам действительно не нужно интегрироваться с сетью на OperationQueue, тогда Future - хороший кандидат. Вы можете запустить несколько Future с и собрать их в какой-то момент, но я бы действительно посоветовал посмотреть на URLSession расширения для Combine.

struct User: Codable {
   var username: String
}

let requestURL = URL(string: "https://example.com/")!
let publisher = URLSession.shared.dataTaskPublisher(for: requestURL)
    .map { $0.data }
    .decode(type: User.self, decoder: JSONDecoder())

Что касается запуска пакета запросов, то можно использоватьPublishers.MergeMany, то есть:

struct User: Codable {
   var username: String
}

let userIds = [1, 2, 3]

let subscriber = Just(userIds)
    .setFailureType(to: Error.self)
    .flatMap { (values) -> Publishers.MergeMany<AnyPublisher<User, Error>> in
    let tasks = values.map { (userId) -> AnyPublisher<User, Error> in
            let requestURL = URL(string: "https://jsonplaceholder.typicode.com/users/\(userId)")!

            return URLSession.shared.dataTaskPublisher(for: requestURL)
                .map { $0.data }
                .decode(type: User.self, decoder: JSONDecoder())
                .eraseToAnyPublisher()
    }
    return Publishers.MergeMany(tasks)
}.collect().sink(receiveCompletion: { (completion) in
    if case .failure(let error) = completion {
        print("Got error: \(error.localizedDescription)")
    }
}) { (allUsers) in
    print("Got users:")
    allUsers.map { print("\($0)") }
}

В приведенном выше примере я использую collect для сбора всех результатов, что откладывает отправку значения до Sink до тех пор, пока все сетевые запросы не будут успешно завершены, однако выможет избавиться от collect и получать каждый User в приведенном выше примере один за другим по мере выполнения сетевых запросов.

...