Объединить фреймворк, сериализовать asyn c операции - PullRequest
8 голосов
/ 15 января 2020

Как мне заставить асинхронные конвейеры, составляющие инфраструктуру Combine, выстраиваться синхронно (последовательно)?

Предположим, у меня есть 50 URL-адресов, с которых я хочу загрузить соответствующие ресурсы, и скажем, я хочу делай это по одному. Я знаю, как это сделать с помощью Operation / OperationQueue, например, с помощью подкласса Operation, который не объявляет себя завершенным до завершения загрузки. Как бы я сделал то же самое, используя Combine?

На данный момент все, что мне приходит в голову, - это сохранить глобальный список оставшихся URL-адресов и выдвинуть один, настроить один конвейер для одной загрузки, выполнить загрузите, и в sink конвейера повторите. Это не очень похоже на Combine.

Я попытался создать массив URL-адресов и сопоставить его с массивом издателей. Я знаю, что могу «создать» издателя и заставить его опубликовать sh по конвейеру, используя flatMap. Но тогда я все еще делаю все загрузки одновременно. Не существует какого-либо комбинированного способа обхода массива контролируемым образом - или есть?

(Я также представлял, что буду делать что-то с Future, но я безнадежно запутался. Я не привык к такому мышлению .)

Ответы [ 6 ]

3 голосов
/ 24 января 2020

Вы можете создать собственного подписчика, чтобы получать возвращающихся подписчиков. Demand.max (1). В этом случае подписчик будет запрашивать следующее значение только при его получении. Пример для Int.publisher, но некоторая случайная задержка в карте имитирует сетевой трафик c: -)

import PlaygroundSupport
import SwiftUI
import Combine

class MySubscriber: Subscriber {
  typealias Input = String
  typealias Failure = Never

  func receive(subscription: Subscription) {
    print("Received subscription", Thread.current.isMainThread)
    subscription.request(.max(1))
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    print("Received input: \(input)", Thread.current.isMainThread)
    return .max(1)
  }

  func receive(completion: Subscribers.Completion<Never>) {
    DispatchQueue.main.async {
        print("Received completion: \(completion)", Thread.current.isMainThread)
        PlaygroundPage.current.finishExecution()
    }
  }
}

(110...120)
    .publisher.receive(on: DispatchQueue.global())
    .map {
        print(Thread.current.isMainThread, Thread.current)
        usleep(UInt32.random(in: 10000 ... 1000000))
        return String(format: "%02x", $0)
    }
    .subscribe(on: DispatchQueue.main)
    .subscribe(MySubscriber())

print("Hello")

PlaygroundPage.current.needsIndefiniteExecution = true

Печать игровой площадки ...

Hello
Received subscription true
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 6e false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 6f false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 70 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 71 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 72 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 73 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 74 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 75 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 76 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 77 false
false <NSThread: 0x600000053400>{number = 3, name = (null)}
Received input: 78 false
Received completion: finished true

ОБНОВЛЕНИЕ наконец я нашел .flatMap(maxPublishers: ), что заставляет меня обновить эту интересную топи c с немного другим подходом. Пожалуйста, обратите внимание, что я использую глобальную очередь для планирования, а не только некоторую случайную задержку, просто чтобы быть уверенным, что получение сериализованного потока не является «случайным» или «счастливым» поведением: -)

import PlaygroundSupport
import Combine
import Foundation

PlaygroundPage.current.needsIndefiniteExecution = true

let A = (1 ... 9)
    .publisher
    .flatMap(maxPublishers: .max(1)) { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
        }
}
.sink { value in
    print(value, "A")
}

let B = (1 ... 9)
    .publisher
    .flatMap { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
        }
}
.sink { value in
    print("     ",value, "B")
}

print

1 A
      4 B
      5 B
      7 B
      1 B
      2 B
      8 B
      6 B
2 A
      3 B
      9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A

Исходя из написанного здесь

.serialize ()?

, определенного Клей Эллисом, принятый ответ можно заменить на

.publisher.flatMap (maxPublishers: .max (1)) {$ 0}

, в то время как в «несериализированной» версии необходимо использовать

.publisher.flatMap { $ 0}

"пример из реального мира"

import PlaygroundSupport
import Foundation
import Combine

let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap { (parameter) in
    var components = URLComponents()
    components.scheme = "https"
    components.path = path
    components.queryItems = [URLQueryItem(name: parameter, value: nil)]
    return components.url
}
//["https://postman-echo.com/get?]
struct Postman: Decodable {
    var args: [String: String]
}


let collection = urls.compactMap { value in
        URLSession.shared.dataTaskPublisher(for: value)
        .tryMap { data, response -> Data in
            return data
        }
        .decode(type: Postman.self, decoder: JSONDecoder())
        .catch {_ in
            Just(Postman(args: [:]))
    }
}

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return $0.append($1).eraseToAnyPublisher()
        }
    }
}

var streamA = ""
let A = collection
    .publisher.flatMap{$0}

    .sink(receiveCompletion: { (c) in
        print(streamA, "     ", c, "    .publisher.flatMap{$0}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamA)
    })


var streamC = ""
let C = collection
    .serialize()?

    .sink(receiveCompletion: { (c) in
        print(streamC, "     ", c, "    .serialize()?")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamC)
    })

var streamD = ""
let D = collection
    .publisher.flatMap(maxPublishers: .max(1)){$0}

    .sink(receiveCompletion: { (c) in
        print(streamD, "     ", c, "    .publisher.flatMap(maxPublishers: .max(1)){$0}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamD)
    })

PlaygroundPage.current.needsIndefiniteExecution = true

отпечатки

.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-)       finished     .publisher.flatMap{$0}
... which proves the downloads are happening serially .-)       finished     .publisher.flatMap(maxPublishers: .max(1)){$0}
... which proves the downloads are happening serially .-)       finished     .serialize()?

Мне также кажется очень полезным в других сценариях ios. Попробуйте использовать значение по умолчанию maxPublishers в следующем фрагменте и сравните результаты: -)

import Combine

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    //.publish
    .flatMap(maxPublishers: .max(1), { (pair)  in
        Just(pair)
    })
    .print()
    .sink { letters, digits in
        print(letters, digits)
    }

"Hello World!".map(String.init).forEach { (s) in
    subject.send(s)
}
subject.send(completion: .finished)
2 голосов
/ 24 января 2020

Исходный вопрос:

Я попытался создать массив URL-адресов и сопоставить его с массивом издателей. Я знаю, что могу «издать» издателя и заставить его опубликовать sh по конвейеру, используя flatMap. Но тогда я все еще делаю все загрузки одновременно. Нет никакого комбинированного способа обхода массива контролируемым образом - или есть?


Вот игрушечный пример для решения реальной проблемы:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap() {$0}
    .sink {print($0)}.store(in:&self.storage)

Издает целые числа от 1 до 10 в случайном порядке, поступая в случайное время. Цель состоит в том, чтобы сделать с collection что-то, что заставит его испускать целые числа от 1 до 10.


Теперь мы собираемся изменить только одну вещь: в строке

.flatMap {$0}

мы добавляем параметр maxPublishers:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap(maxPublishers:.max(1)) {$0}
    .sink {print($0)}.store(in:&self.storage)

Presto, теперь мы do выдаем целые числа от 1 до 10 по порядку, со случайными интервалами между их.


Давайте применим это к исходной задаче. Для демонстрации мне нужно довольно медленное соединение с Inte rnet и довольно большой ресурс для загрузки. Сначала я сделаю это с обычными .flatMap:

let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
    .map {URL(string:$0)!}
    .map {session.dataTaskPublisher(for: $0)
        .eraseToAnyPublisher()
}
collection.publisher.setFailureType(to: URLError.self)
    .handleEvents(receiveOutput: {_ in print("start")})
    .flatMap() {$0}
    .map {$0.data}
    .sink(receiveCompletion: {comp in
        switch comp {
        case .failure(let err): print("error", err)
        case .finished: print("finished")
        }
    }, receiveValue: {_ in print("done")})
    .store(in:&self.storage)

Результатом будет

start
start
start
done
done
done
finished

, который показывает, что мы делаем три загрузки одновременно. Хорошо, теперь измените

    .flatMap() {$0}

на

    .flatMap(maxPublishers:.max(1) {$0}

Результат теперь таков:

start
done
start
done
start
done
finished

Итак, мы сейчас скачиваем серийно, что изначально является проблемой для быть решенным.


append

В соответствии с принципом TIMTOWTDI, мы можем вместо этого связать издателей с append для их сериализации:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
let pub = collection.dropFirst().reduce(collection.first!) {
    return $0.append($1).eraseToAnyPublisher()
}

Результатом является издатель, который сериализует отложенных издателей в исходной коллекции. Давайте докажем это, подписавшись на него:

pub.sink {print($0)}.store(in:&self.storage)

Конечно, целые числа теперь приходят в порядке (со случайными интервалами между ними). ​​


Мы можем инкапсулировать создание pub из коллекции издателей с расширением коллекции, как предложил Клэй Эллис:

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return $0.append($1).eraseToAnyPublisher()
        }
    }
}
2 голосов
/ 20 января 2020

Я только кратко проверил это, но при первом прохождении выясняется, что каждый запрос ждет, пока предыдущий запрос не завершится sh, прежде чем начать.

Я публикую это решение в поисках обратной связи. Пожалуйста, будьте критичны, если это не хорошее решение.

extension Collection where Element: Publisher {

    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        // If the collection is empty, we can't just create an arbititary publisher
        // so we return nil to indicate that we had nothing to serialize.
        if isEmpty { return nil }

        // We know at this point that it's safe to grab the first publisher.
        let first = self.first!

        // If there was only a single publisher then we can just return it.
        if count == 1 { return first.eraseToAnyPublisher() }

        // We're going to build up the output starting with the first publisher.
        var output = first.eraseToAnyPublisher()

        // We iterate over the rest of the publishers (skipping over the first.)
        for publisher in self.dropFirst() {
            // We build up the output by appending the next publisher.
            output = output.append(publisher).eraseToAnyPublisher()
        }

        return output
    }
}


Более краткая версия этого решения (предоставлена ​​@matt):

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            $0.append($1).eraseToAnyPublisher()
        }
    }
}
2 голосов
/ 15 января 2020

Во всех остальных средах Reactive это действительно легко; вы просто используете concat, чтобы объединить и сгладить результаты за один шаг, а затем вы можете reduce результаты в окончательный массив. Apple делает это трудным, потому что Publisher.Concatenate не имеет перегрузки, которая принимает массив издателей. Есть похожая странность с Publisher.Merge. У меня есть ощущение, что это связано с тем, что они возвращают вложенные генерики c, а не просто возвращают один тип c, например, rx Observable. Я полагаю, что вы можете просто вызвать Concatenate в al oop и затем уменьшить объединенные результаты в один массив, но я действительно надеюсь, что они решат эту проблему в следующем выпуске. Безусловно, необходимо объединить более двух издателей и объединить более четырех издателей (а перегрузки для этих двух операторов даже не согласованы, что просто странно).

РЕДАКТИРОВАТЬ:

Я вернулся к этому и обнаружил, что вы действительно можете объединить произвольный массив издателей, и они будут издавать в последовательности. Я понятия не имею, почему не существует такой функции, как ConcatenateMany, чтобы сделать это для вас, но похоже, что если вы готовы использовать издателя со стертым типом, его не так сложно написать самостоятельно. Этот пример показывает, что слияние генерируется во временном порядке, а concat - в порядке комбинации:

import PlaygroundSupport
import SwiftUI
import Combine

let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p) { total, next in
  total.append(next).eraseToAnyPublisher()
}

var subscriptions = Set<AnyCancellable>()

concatenated
  .sink(receiveValue: { v in
    print("concatenated: \(v)")
  }).store(in: &subscriptions)

Publishers
  .MergeMany([p,q,r])
  .sink(receiveValue: { v in
    print("merge: \(v)")
  }).store(in: &subscriptions)
1 голос
/ 15 января 2020

Вот код игровой площадки на одну страницу, который описывает возможный подход. Основная идея состоит в том, чтобы преобразовать асинхронные вызовы API c в цепочку из Future издателей, создавая последовательный конвейер.

Input: диапазон значений от 1 до 10, которые асинхронно в фоновой очереди преобразуются в строки

Демонстрация прямого вызова в asyn c API:

let group = DispatchGroup()
inputValues.map {
    group.enter()
    asyncCall(input: $0) { (output, _) in
        print(">> \(output), in \(Thread.current)")
        group.leave()
    }
}
group.wait()

Выход:

>> 1, in <NSThread: 0x7fe76264fff0>{number = 4, name = (null)}
>> 3, in <NSThread: 0x7fe762446b90>{number = 3, name = (null)}
>> 5, in <NSThread: 0x7fe7624461f0>{number = 5, name = (null)}
>> 6, in <NSThread: 0x7fe762461ce0>{number = 6, name = (null)}
>> 10, in <NSThread: 0x7fe76246a7b0>{number = 7, name = (null)}
>> 4, in <NSThread: 0x7fe764c37d30>{number = 8, name = (null)}
>> 7, in <NSThread: 0x7fe764c37cb0>{number = 9, name = (null)}
>> 8, in <NSThread: 0x7fe76246b540>{number = 10, name = (null)}
>> 9, in <NSThread: 0x7fe7625164b0>{number = 11, name = (null)}
>> 2, in <NSThread: 0x7fe764c37f50>{number = 12, name = (null)}

Демонстрация трубопровода комбайна:

Выход:

>> got 1
>> got 2
>> got 3
>> got 4
>> got 5
>> got 6
>> got 7
>> got 8
>> got 9
>> got 10
>>>> finished with true

Код:

import Cocoa
import Combine
import PlaygroundSupport

// Assuming there is some Asynchronous API with
// (eg. process Int input value during some time and generates String result)
func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void) {
    DispatchQueue.global(qos: .background).async {
            sleep(.random(in: 1...5)) // wait for random Async API output
            completion("\(input)", nil)
        }
}

// There are some input values to be processed serially
let inputValues = Array(1...10)

// Prepare one pipeline item based on Future, which trasform Async -> Sync
func makeFuture(input: Int) -> AnyPublisher<Bool, Error> {
    Future<String, Error> { promise in
        asyncCall(input: input) { (value, error) in
            if let error = error {
                promise(.failure(error))
            } else {
                promise(.success(value))
            }
        }
    }
    .receive(on: DispatchQueue.main)
    .map {
        print(">> got \($0)") // << sideeffect of pipeline item
        return true
    }
    .eraseToAnyPublisher()
}

// Create pipeline trasnforming input values into chain of Future publishers
var subscribers = Set<AnyCancellable>()
let pipeline =
    inputValues
    .reduce(nil as AnyPublisher<Bool, Error>?) { (chain, value) in
        if let chain = chain {
            return chain.flatMap { _ in
                makeFuture(input: value)
            }.eraseToAnyPublisher()
        } else {
            return makeFuture(input: value)
        }
    }

// Execute pipeline
pipeline?
    .sink(receiveCompletion: { _ in
        // << do something on completion if needed
    }) { output in
        print(">>>> finished with \(output)")
    }
    .store(in: &subscribers)

PlaygroundPage.current.needsIndefiniteExecution = true
0 голосов
/ 13 апреля 2020

Используйте flatMap(maxPublishers:transform:) с .max(1), например,

func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
    Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: $0) })
        .flatMap(maxPublishers: .max(1)) { $0 }
        .eraseToAnyPublisher()
}

Где

func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
    URLSession.shared.dataTaskPublisher(for: url)
        .compactMap { UIImage(data: $0.data) }
        .receive(on: RunLoop.main)
        .eraseToAnyPublisher()
}

и

var imageRequests: AnyCancellable?

func fetchImages() {
    imageRequests = imagesPublisher(for: urls).sink(receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("done")
        case .failure(let error):
            print("failed", error)
        }
    }, receiveValue: { image in
        // do whatever you want with the images as they come in
    })
}

В результате:

serial

Но мы должны признать, что вы получаете большой удар производительности, делая их последовательно, вот так. Например, если я увеличу до 6 за раз, это будет более чем в два раза быстрее:

concurrent

Лично я бы рекомендовал загружать только последовательно, если вам абсолютно необходимо (что при загрузке серии изображений / файлов почти наверняка не так). Да, одновременное выполнение запросов может привести к тому, что они не будут завершены в определенном порядке, но мы просто используем структуру, которая не зависит от порядка (например, словарь, а не простой массив), но прирост производительности настолько значителен, что в целом это того стоит.

Но, если вы хотите, чтобы они загружались последовательно, параметр maxPublishers может достичь этого.

...