Великий Центральный Диспетчер для сложного потока? - PullRequest
0 голосов
/ 26 апреля 2018

У меня есть a, b, c, d, e трудоемкие задачи с обработчиком завершения.

Между ними есть ограничения:

  1. Оба b & c ожидание a до конца
  2. Последнее задание e ожидает b & c & d до финиша

enter image description here

если задачи нет d , я мог бы написать код на swift, как это (еще не проверено)

let group = DispatchGroup()

group.enter()
a() { group.leave() }
group.wait()

group.enter()
b() { group.leave() }

group.enter()
c() { group.leave() }

group.notify(queue: .main) {
    e()
}

Как добавить задачу d без ожидания a для завершения?


Отредактировано 4/30 10:00 (+8)

Код Различный сказал

самый простой подходсделать функцию загрузки синхронной и добавить предупреждение в документацию о том, что ее нельзя вызывать из основного потока.

Поэтому я сделал версию на ее основе.Этот способ не может обработать возвращаемые значения от одновременных вызовов.Но это действительно похоже на асинхронное ожидание.Так что я доволен сейчас.Спасибо вам, ребята.

асинхронная / ожидающая часть похожа на

    myQueue.async {
        downloadSync("A")
        downloadSync("B", isConcurrent: true)
        downloadSync("C", isConcurrent: true)
        downloadSync("D", 4, isConcurrent: true)
        waitConcurrentJobs()
        downloadSync("E")
    }

А полный код приведен ниже.

    let myGroup = DispatchGroup()
    let myQueue = DispatchQueue(label: "for Sync/Blocking version of async functions")

    func waitConcurrentJobs() {
        myGroup.wait()
    }

    // original function (async version, no source code)
    func download(_ something: String, _ seconds: UInt32 = 1, completionHandler: @escaping ()->Void = {}) {
        print("Downloading \(something)")
        DispatchQueue.global().async {
            sleep(seconds)
            print("\(something) is downloaded")
            completionHandler()
        }
    }

    // wrapped function (synced version)
    // Warning:
    // It blocks current thead !!!
    // Do not call it on main thread
    func downloadSync(
        _ something: String,
        _ seconds: UInt32 = 1,
        isConcurrent: Bool = false
        ){
        myGroup.enter()
        download(something, seconds) { myGroup.leave() }
        if !isConcurrent {
            myGroup.wait()
        }
    }

    // Now it really looks like ES8 async/await
    myQueue.async {
        downloadSync("A")
        downloadSync("B", isConcurrent: true)
        downloadSync("C", isConcurrent: true)
        downloadSync("D", 4, isConcurrent: true)
        waitConcurrentJobs()
        downloadSync("E")
    }

результаты

enter image description here

Ответы [ 3 ]

0 голосов
/ 28 апреля 2018

Я хотел бы показать альтернативное решение, использующее Scala-подобные фьючерсы:

let result = funcA().flatMap { resultA in
    return [funcB(param: resultA.0),
            funcC(param: resultA.1),
            funcD()]
        .fold(initial: [String]()) { (combined, element) in
            return combined + [element]
    }
}.flatMap { result in
    return funcE(param: result)
}.map { result in
    print(result)
}

Вот и все в принципе.Он обрабатывает ошибки (неявно) и является потокобезопасным.Нет подклассов операций;)

Обратите внимание, что funcD будет вызываться только после успешного завершения A.Поскольку funcA() может потерпеть неудачу, называть это бессмысленно.Но код можно легко адаптировать, чтобы сделать это возможным, даже если это необходимо.

Сравните это с функцией foo() из моего другого решения, использующего группы рассылки и очереди рассылки.

Нижепример определений асинхронных функций, каждая из которых передает свой результат следующему:

func funcA() -> Future<(String, String)> {
    print("Start A")
    let promise = Promise<(String, String)>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
        print("Complete A")
        promise.complete(("A1", "A2"))
    }
    return promise.future
}

func funcB(param: String) -> Future<String> {
    print("Start B")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
        print("Complete B")
        promise.complete("\(param) -> B")
    }
    return promise.future
}

func funcC(param: String) -> Future<String> {
    print("Start C")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
        print("Complete C")
        promise.complete("\(param) -> C")
    }
    return promise.future
}

func funcD() -> Future<String> {
    print("Start D")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
        print("Complete D")
        promise.complete("D")
    }
    return promise.future
}

func funcE(param: [String]) -> Future<String> {
    print("Start E")
    let promise = Promise<String>()
    DispatchQueue.global().asyncAfter(deadline: .now() + 4) {
        print("Complete E")
        promise.complete("\(param) -> E")
    }
    return promise.future
}

, который выводит это на консоль:

Start A Complete A Start B Start C Start D Complete B Complete C Complete D Start E Complete E ["A1 -> B", "A2 -> C", "D"] -> E

Подсказка: есть несколько библиотек Future и Promise.

0 голосов
/ 02 августа 2018

Ваши первоначальные усилия кажутся мне очень близкими.Вы можете внести незначительную корректировку: сделать B, C и D группой, которая завершает запуск E.

A может быть другой группой, но так как это одна задачаЯ не вижу смысла.Запустите B и C, когда это будет сделано.

Обратите внимание, что в отличие от некоторых примеров кода в вашем вопросе и других ответах, в приведенном ниже коде, D и A могут начинаться сразуи работать параллельно.

let q = DispatchQueue(label: "my-queue", attributes: .concurrent)
let g = DispatchGroup()
func taskA() {  print("A")  }
func taskB() {  print("B"); g.leave()  }
func taskC() {  print("C"); g.leave()  }
func taskD() {  print("D"); g.leave()  }
func taskE() {  print("E")  }
g.enter()
g.enter()
g.enter()
q.async {
    taskA()
    q.async(execute: taskB)
    q.async(execute: taskC)
}
q.async(execute: taskD)
g.notify(queue: q, execute: taskE)
0 голосов
/ 27 апреля 2018

Редактировать: самый простой подход - сделать функцию download синхронной и добавить в документацию предупреждение о том, что ее никогда не следует вызывать из основного потока.Пирамида гибели для асинхронной функции - причина, по которой сопрограммы были предложены никем иным, как Крисом Латтнером, создателем Swift.По состоянию на апрель 2018 года, это еще не формальное предложение, ожидающее рассмотрения, поэтому есть вероятность, что вы не увидите его в Swift 5.

Синхронная функция загрузки:

// Never call this from main thread
func download(_ something: String, _ seconds: UInt32 = 1, completionHandler: @escaping ()->Void = {}) {
    let group = DispatchGroup()

    print("Downloading \(something)")
    group.enter()
    DispatchQueue.global().async {
        sleep(seconds)
        print("\(something) is downloaded")
        completionHandler()
        group.leave()
    }
    group.wait()
}

И NSOperation / NSOperationQueue настройка:

let opA = BlockOperation() {
    self.download("A")
}
let opB = BlockOperation() {
    self.download("B")
}
let opC = BlockOperation() {
    self.download("C")
}
let opD = BlockOperation() {
    self.download("D", 4)
}
let opE = BlockOperation() {
    self.download("E")
}

opB.addDependency(opA)
opC.addDependency(opA)

opE.addDependency(opB)
opE.addDependency(opC)
opE.addDependency(opD)

let operationQueue = OperationQueue()
operationQueue.addOperations([opA, opB, opC, opD, opE], waitUntilFinished: false)
...