Использовать очередь и семафор для параллелизма и оболочки свойств? - PullRequest
2 голосов
/ 03 октября 2019

Я пытаюсь создать потокобезопасную оболочку свойства. Я мог только думать об GCD-очередях и семафорах как о самом быстром и надежном способе. Являются ли семафоры просто более производительными (если это так), или есть другая причина использовать один поверх другого для параллелизма?

Ниже приведены два варианта оберток атомарных свойств:

@propertyWrapper
struct Atomic<Value> {
    private var value: Value
    private let queue = DispatchQueue(label: "Atomic serial queue")

    var wrappedValue: Value {
        get { queue.sync { value } }
        set { queue.sync { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

@propertyWrapper
struct Atomic2<Value> {
    private var value: Value
    private var semaphore = DispatchSemaphore(value: 1)

    var wrappedValue: Value {
        get {
            semaphore.wait()
            let temp = value
            semaphore.signal()
            return temp
        }

        set {
            semaphore.wait()
            value = newValue
            semaphore.signal()
        }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

struct MyStruct {
    @Atomic var counter = 0
    @Atomic2 var counter2 = 0
}

func test() {
    var myStruct = MyStruct()

    DispatchQueue.concurrentPerform(iterations: 1000) {
        myStruct.counter += $0
        myStruct.counter2 += $0
   }
}

КакМогут ли они быть должным образом протестированы и измерены, чтобы увидеть разницу между двумя реализациями, и если они вообще работают?

1 Ответ

3 голосов
/ 03 октября 2019

FWIW, другой вариант - это шаблон чтения-записи с параллельной очередью, где чтения выполняются синхронно, но разрешены для одновременного выполнения относительно других операций чтения, но записи выполняются асинхронно, но с барьером (т.е. не одновременно сна любые другие операции чтения или записи):

@propertyWrapper
class Atomic<Value> {
    private var value: Value
    private let queue = DispatchQueue(label: "com.domain.app.atomic", attributes: .concurrent)

    var wrappedValue: Value {
        get { queue.sync { value } }
        set { queue.async(flags: .barrier) { self.value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

Еще одна блокировка:

@propertyWrapper
struct Atomic<Value> {
    private var value: Value
    private var lock = NSLock()

    var wrappedValue: Value {
        get { lock.synchronized { value } }
        set { lock.synchronized { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

где

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}

Мы должны признать, что пока эти и ваши, предлагает атомарность, он не собирается обеспечивать потокобезопасное взаимодействие.

Рассмотрим этот простой эксперимент, в котором мы увеличиваем целое число в миллион раз:

@Atomic var foo = 0

func threadSafetyExperiment() {
    DispatchQueue.global().async {
        DispatchQueue.concurrentPerform(iterations: 1_000_000) { _ in
            self.foo += 1
        }
        print(self.foo)
    }
}

Вы ожидаете fooбыть равным 1 000 000, но не будет. Это связано с тем, что все взаимодействие «извлекать значение, увеличивать его и сохранять его» необходимо заключить в единый механизм синхронизации.

Итак, вы вернулись к решениям, не относящимся к оберткам, например,

class Synchronized<Value> {
    private var _value: Value
    private let lock = NSLock()

    init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { lock.synchronized { _value } }
        set { lock.synchronized { _value = newValue } }
    }

    func synchronized(block: (inout Value) -> Void) {
        lock.synchronized {
            block(&_value)
        }
    }
}

И тогда это прекрасно работает:

var foo = Synchronized<Int>(0)

func threadSafetyExperiment() {
    DispatchQueue.global().async {
        DispatchQueue.concurrentPerform(iterations: 1_000_000) { _ in
            self.foo.synchronized { value in
                value += 1
            }
        }
        print(self.foo.value)
    }
}

Как их можно должным образом проверить и измерить, чтобы увидеть разницу между двумя реализациями и даже если они работают?

Несколько мыслей:

  • Я бы предложил сделать гораздо больше, чем 1000 итераций. Вы хотите сделать достаточно итераций, чтобы результаты измерялись в секундах, а не в миллисекундах. Лично я использовал миллион итераций.

  • Среда модульного тестирования идеальна как для проверки правильности, так и для измерения производительности с использованием метода measure (который повторяет тест производительности 10 раз длякаждый модульный тест и результаты будут отражены в отчетах о модульных тестах):

    enter image description here

    Итак, создайте проект с целью модульного теста (или добавьтеесли вам нужно, выполнить модульный тест для существующего проекта), а затем создать модульные тесты и выполнить их с помощью команды + u .

  • Есливы редактируете схему для своей цели, вы можете выбрать случайный порядок ваших тестов, чтобы убедиться, что порядок их выполнения не влияет на производительность:

    enter image description here

    Я бы также заставил цель тестирования использовать сборку релиза, чтобы убедиться, что вы тестируете оптимизированную сборку.

Это пример множества различныхсинхронизация с использованием последовательной очереди GCD, одновременнаяочередь, блокировки, недобросовестные блокировки, семафоры:

class SynchronizedSerial<Value> {
    private var _value: Value
    private let queue = DispatchQueue(label: "com.domain.app.atomic")

    required init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { queue.sync { _value } }
        set { queue.async { self._value = newValue } }
    }

    func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        try queue.sync {
            try block(&_value)
        }
    }

    func writer(block: @escaping (inout Value) -> Void) -> Void {
        queue.async {
            block(&self._value)
        }
    }
}

class SynchronizedReaderWriter<Value> {
    private var _value: Value
    private let queue = DispatchQueue(label: "com.domain.app.atomic", attributes: .concurrent)

    required init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { queue.sync { _value } }
        set { queue.async(flags: .barrier) { self._value = newValue } }
    }

    func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        try queue.sync(flags: .barrier) {
            try block(&_value)
        }
    }

    func reader<T>(block: (Value) throws -> T) rethrows -> T {
        try queue.sync {
            try block(_value)
        }
    }

    func writer(block: @escaping (inout Value) -> Void) -> Void {
        queue.async(flags: .barrier) {
            block(&self._value)
        }
    }
}

struct SynchronizedLock<Value> {
    private var _value: Value
    private let lock = NSLock()

    init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { lock.synchronized { _value } }
        set { lock.synchronized { _value = newValue } }
    }

    mutating func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        try lock.synchronized {
            try block(&_value)
        }
    }
}

/// Unfair lock synchronization
///
/// - Warning: The documentation warns us: “In general, higher level synchronization primitives such as those provided by the pthread or dispatch subsystems should be preferred.”</quote>

class SynchronizedUnfairLock<Value> {
    private var _value: Value
    private var lock = os_unfair_lock()

    required init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { synchronized { $0 } }
        set { synchronized { $0 = newValue } }
    }

    func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        os_unfair_lock_lock(&lock)
        defer { os_unfair_lock_unlock(&lock) }
        return try block(&_value)
    }
}

struct SynchronizedSemaphore<Value> {
    private var _value: Value
    private let semaphore = DispatchSemaphore(value: 1)

    init(_ value: Value) {
        self._value = value
    }

    var value: Value {
        get { semaphore.waitAndSignal { _value } }
        set { semaphore.waitAndSignal { _value = newValue } }
    }

    mutating func synchronized<T>(block: (inout Value) throws -> T) rethrows -> T {
        try semaphore.waitAndSignal {
            try block(&_value)
        }
    }
}

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}

extension DispatchSemaphore {
    func waitAndSignal<T>(block: () throws -> T) rethrows -> T {
        wait()
        defer { signal() }
        return try block()
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...