Как вы агрегируете данные из DispatchQueue.concurrentPerform (), используя GCD? - PullRequest
0 голосов
/ 14 апреля 2020

Как можно агрегировать данные при использовании ConcurrentPerform () Grand Central Dispatch?

Я делаю то, что в коде ниже, но resultDictionary, похоже, теряет все свои данные, когда заканчивается блок notify () , Таким образом, все, что я получаю, это пустой словарь, который возвращается из функции.

Я не уверен, почему это происходит, потому что когда я печатаю или устанавливаю точку останова, я вижу, что в resultDictionary есть что-то перед блоком заканчивается.

    let getCVPDispatchQueue = DispatchQueue(label: "blarg",
                                        qos: .userInitiated,
                                        attributes: .concurrent)
    let getCVPDispatchGroup = DispatchGroup()

    var resultDictionary = dataIDToSRLParticleDictionary()

    getCVPDispatchQueue.async { [weak self] in
        guard let self = self else { return }
        DispatchQueue.concurrentPerform(iterations: self.dataArray.count) { [weak self] (index) in
            guard let self = self else { return }
            let data = self.dataArray[index]
            getCVPDispatchGroup.enter()
            let theResult = data.runPartcleFilterForClosestParticleAndMaybeStopAudio()
            switch theResult {
                case .success(let CVParticle):
                    // If there was a CVP found, add it to the set.
                    if let theCVParticle = CVParticle {
                        self.dataIDsToCVPDictionary.addTodataIDToCVPDict(key: data.ID,
                                                                            value: theCVParticle)
                    }
                case .failure(let error):
                    os_log(.error, log: self.logger, "rundatasProcessing error: %s", error.localizedDescription)
                    self._isActive = false
            }
            getCVPDispatchGroup.leave()
        }

        getCVPDispatchGroup.notify(queue: .main) { [weak self] in
            guard let self = self else { return }
            print("DONE with \(self.dataIDsToCVPDictionary.getDictionary.count)")
            resultDictionary = self.dataIDsToCVPDictionary.getDictionary
            print("resultDictionary has \(self.dataIDsToCVPDictionary.getDictionary.count)")
        }

    }

    print("Before Return  with \(resultDictionary.count)")

    return resultDictionary

}

Не уверен, поможет ли это, но это простой класс, который я сделал для безопасного доступа к потоку словаря.

class DATASynchronizedIDToParticleDictionary {
    var unsafeDictionary: DATAIDToDATAParticleDictionary = DATAIDToDATAParticleDictionary()
    let accessQueue = DispatchQueue(label: "blarg2",
                                    qos: .userInitiated,
                                    attributes: .concurrent)
    var getDictionary: DATAIDToDATAParticleDictionary {
        get {
            var dictionaryCopy: DATAIDToDATAParticleDictionary!
            accessQueue.sync {
                dictionaryCopy = unsafeDictionary
            }
            return dictionaryCopy
        }
    }
    func addToDATAIDToCVPDict(key: String, value: DATAParticle) {
        accessQueue.async(flags: .barrier) { [weak self] in
            guard let self = self else { return }
            self.unsafeDictionary[key] = value
        }
    }
    func clearDictionary() {
        accessQueue.async(flags: .barrier) { [weak self] in
            guard let self = self else { return }
            self.unsafeDictionary.removeAll()
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 16 апреля 2020

Вы сказали:

Я делаю то, что в коде ниже, но resultDictionary, похоже, теряет все свои данные, когда заканчивается блок notify(). Таким образом, все, что я получаю, это пустой словарь, который возвращается из функции.

Проблема в том, что вы пытаетесь return значение, которое вычисляется асинхронно. Скорее всего, вы хотите перейти к шаблону блока завершения.

Кроме того, группа отправки не нужна. По иронии судьбы, concurrentPerform является синхронным (то есть он не продолжается до тех пор, пока распараллеленный for l oop не закончится). Поэтому нет смысла использовать notify, если вы знаете, что после concurrentPerform не доберетесь до линии, пока не будут выполнены все итерации.

Я бы также не рекомендовал иметь concurrentPerform l oop обновить свойства. Это подвергает вас множеству проблем. Например, что, если основной поток взаимодействовал с этим объектом одновременно? Конечно, вы можете синхронизировать свой доступ, но он может быть неполным. Вероятно, безопаснее, чтобы он обновлял только локальные переменные, и чтобы вызывающая сторона выполняла обновление свойства в своем блоке обработчика завершения. Очевидно, что вы можете go впереди и обновлять свойства (особенно если вы хотите обновить свой пользовательский интерфейс, чтобы отразить прогресс в полете), но это добавляет дополнительную складку к коду, которая может не потребоваться. Ниже я предположил, что в этом нет необходимости.

Кроме того, хотя я ценю намерения, стоящие за всеми этими [weak self] ссылками, они действительно не нужны, особенно в вашем классе синхронизации DATASynchronizedIDToParticleDictionary. Мы часто используем weak ссылки, чтобы избежать сильных циклов ссылок. Но если у вас нет сильных ссылок, они просто увеличивают накладные расходы, если у вас нет других неотложных потребностей.

ОК, так что давайте углубимся в код.

  • Сначала я бы удалил специализированный DATASynchronizedIDToParticleDictionary с универсальным универсальным кодом c:

    class SynchronizedDictionary<Key: Hashable, Value> {
        private var _dictionary: [Key: Value]
        private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".dictionary", qos: .userInitiated, attributes: .concurrent)
    
        init(_ dictionary: [Key: Value] = [:]) {
            _dictionary = dictionary
        }
    
        var dictionary: [Key: Value] {
            queue.sync { _dictionary }
        }
    
        subscript(key: Key) -> Value? {
            get { queue.sync                   { _dictionary[key] } }
            set { queue.async(flags: .barrier) { self._dictionary[key] = newValue } }
        }
    
        func removeAll() {
            queue.async(flags: .barrier) {
                self._dictionary.removeAll()
            }
        }
    }
    

    Примечание. Я удалил ненужные weak ссылок. Я также переименовал addToDATAIDToCVPDict и clearDictionary с более естественным оператором подписки и методом removeAll, который более точно отражает интерфейс базового типа Dictionary. Это приводит к более естественному виду кода. (И поскольку это обобщенный c, мы можем использовать его для любого словаря, который нуждается в такого рода синхронизации низкого уровня.)

    В любом случае, теперь вы можете объявить синхронизированное представление словаря следующим образом:

    let particles = SynchronizedDictionary(dataIDToSRLParticleDictionary())
    

    И когда я хочу обновить словарь с некоторым значением, вы можете сделать:

    particles[data.ID] = theCVParticle
    

    А когда я хочу получить фактический базовый, упакованный словарь, я могу сделать:

    let finalResult = particles.dictionary
    
  • Пока мы занимаемся этим, поскольку нам может потребоваться отслеживать массив ошибок, которые необходимо синхронизировать, я мог бы добавить массив эквивалентного типа:

    class SynchronizedArray<Value> {
        private var _array: [Value]
        private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".array", qos: .userInitiated, attributes: .concurrent)
    
        init(_ dictionary: [Value] = []) {
            _array = dictionary
        }
    
        var array: [Value] {
            queue.sync { _array }
        }
    
        subscript(index: Int) -> Value {
            get { queue.sync                   { _array[index] } }
            set { queue.async(flags: .barrier) { self._array[index] = newValue } }
        }
    
        func append(_ value: Value) {
            queue.async(flags: .barrier) {
                self._array.append(value)
            }
        }
    
        func removeAll() {
            queue.async(flags: .barrier) {
                self._array.removeAll()
            }
        }
    }
    
  • Теперь мы можем обратить наше внимание на основную рутину. Поэтому вместо того, чтобы возвращать значение, мы вместо этого дадим ему обработчик завершения @escaping. И, как уже говорилось выше, мы удалили бы ненужную группу рассылки:

    func calculateAllClosestParticles(completion: @escaping ([String: CVParticle], [Error]) -> Void) {
        let queue = DispatchQueue(label: "blarg", qos: .userInitiated, attributes: .concurrent)
        let particles = SynchronizedDictionary(dataIDToSRLParticleDictionary())
        let errors = SynchronizedArray<Error>()
    
        queue.async {
            DispatchQueue.concurrentPerform(iterations: self.dataArray.count) { index in
                let data = self.dataArray[index]
                let result = data.runPartcleFilterForClosestParticleAndMaybeStopAudio()
    
                switch result {
                case .success(let cvParticle):
                    // If there was a CVP found, add it to the set.
                    if let cvParticle = cvParticle {
                        particles[data.ID] = cvParticle
                    }
    
                case .failure(let error):
                    errors.append(error)
                }
            }
    
            DispatchQueue.main.async {
                completion(particles.dictionary, errors.array)
            }
        }
    }
    

    Теперь я не знаю, какие типы словаря были правильными для словаря, поэтому вам может потребоваться настроить параметры completion. И вы не предоставили остальные подпрограммы, поэтому я могу ошибиться в некоторых деталях. Но не заблудитесь в деталях, а просто обратите внимание на тщательное избегание свойств в concurrentPerform и передачу результатов обратно в обработчик завершения.

    Вы бы назвали это так:

    calculateAllClosestParticles { dictionary, errors in
        guard errors.isEmpty else { return }
    
        // you can access the dictionary and updating the model and UI here
        self.someProperty = dictionary
        self.tableView.reloadData()
    }
    
    // but don't try to access the dictionary here, because the asynchronous code hasn't finished yet
    //
    
  • FWIW, хотя я использовал шаблон чтения-записи, который вы использовали в своем примере, по моему опыту, NSLock на самом деле более производительный для быстрой синхронизации, особенно когда вы используете concurrentPerform, которое может ie задействовать все ядра вашего процессора, например,

    class SynchronizedDictionary<Key: Hashable, Value> {
        private var _dictionary: [Key: Value]
        private let lock = NSLock()
    
        init(_ dictionary: [Key: Value] = [:]) {
            _dictionary = dictionary
        }
    
        var dictionary: [Key: Value] {
            lock.synchronized { _dictionary }
        }
    
        subscript(key: Key) -> Value? {
            get { lock.synchronized { _dictionary[key] } }
            set { lock.synchronized { _dictionary[key] = newValue } }
        }
    
        func removeAll() {
            lock.synchronized {
                _dictionary.removeAll()
            }
        }
    }
    

    Где

    extension NSLocking {
        func synchronized<T>(_ closure: () throws -> T) rethrows -> T {
            lock()
            defer { unlock() }
            return try closure()
        }
    }
    

    В итоге вы не хотите принудительно переключать контекст для синхронизации, если вам не нужно.

  • При выполнении одновременного выполнения, если у вас много dataPoints и если время, требуемое для каждого вызова runPartcleFilterForClosestParticleAndMaybeStopAudio, является скромным, вы можете рассмотреть «шаг», выполняя несколько назначений данных в каждой итерации. Это выходит за рамки этого вопроса, но только к вашему сведению.

0 голосов
/ 15 апреля 2020

Не совсем уверен, что я сделал, но я переместил

resultDictionary = self.dataIDsToCVPDictionary.getDictionary

за пределы первого блока asyn c, и это, похоже, позволило сохранить / сохранить данные для возврата функции.

...