Выполнение кода, который заблокирован, должно занять больше времени, чем его разблокированный аналог - PullRequest
0 голосов
/ 28 ноября 2018

Я пытаюсь поэкспериментировать с потоками и блокировками Python.Итак, я создал 2 класса.Оба эти класса используют потоки для увеличения и уменьшения переменной уровня класса 'ref'.

В ThreadUnsafeClass я не использую блокировку перед увеличением и уменьшением.

В ThreadSafeClass я использую блокировкудо увеличения и уменьшения.

Я предположил, что поскольку блокировка заставит некоторые потоки ждать, в случае ThreadSafeClass это займет больше времени.

Результаты указывают на то, что ThreadSafeClass быстрее.

Этомой код (python 2.7)

import threading
import time


class ThreadUnsafeClass(object):
ref = 0

def __init__(self, count_tot=10000):
    self.all_threads = []
    self.count_tot = count_tot
    ThreadUnsafeClass.ref = 0

def inc_ref(self):
    time.sleep(0.1)
    for i in xrange(0, self.count_tot):
        ThreadUnsafeClass.ref += 1

def dec_ref(self):
    time.sleep(0.1)
    for i in xrange(0, self.count_tot):
        ThreadUnsafeClass.ref -= 1


def compute_ref_value(self):
    start_time = time.time()
    for i in xrange(0, 50):
        t1 = threading.Thread(target=self.inc_ref, args=())
        t2 = threading.Thread(target=self.dec_ref, args=())
        t1.start()
        t2.start()
        self.all_threads.append(t1)
        self.all_threads.append(t2)

    for t in self.all_threads:
        t.join()

    print time.time() - start_time, " -> ",

    return ThreadUnsafeClass.ref

class ThreadSafeClass(object):
ref = 0

def __init__(self, count_tot=10000):
    self.all_threads = []
    self.count_tot = count_tot
    ThreadUnsafeClass.ref = 0
    self.lock = threading.Lock()

def inc_ref(self):
    time.sleep(0.1)
    self.lock.acquire()
    for i in xrange(0, self.count_tot):
        ThreadUnsafeClass.ref += 1
    self.lock.release()

def dec_ref(self):
    time.sleep(0.1)
    self.lock.acquire()
    for i in xrange(0, self.count_tot):
        ThreadUnsafeClass.ref -= 1
    self.lock.release()


def compute_ref_value(self):
    start_time = time.time()
    for i in xrange(0, 50):
        t1 = threading.Thread(target=self.inc_ref, args=())
        t2 = threading.Thread(target=self.dec_ref, args=())
        t1.start()
        t2.start()
        self.all_threads.append(t1)
        self.all_threads.append(t2)

    for t in self.all_threads:
        t.join()

    print time.time() - start_time, " -> ",

    return ThreadUnsafeClass.ref

thread_unsafe_class = ThreadUnsafeClass(100000)
print "Value from un-safe threading ", 
thread_unsafe_class.compute_ref_value()

thread_safe_class = ThreadSafeClass(100000)
print "Value from safe threading ", thread_safe_class.compute_ref_value()

Вот мои результаты:

Значение из небезопасных потоков 3.54868483543 -> 30653

Значение из безопасных потоков 2.28372502327 -> 0

Пожалуйста, помогите мне понять, почему блокировка быстрее!

1 Ответ

0 голосов
/ 28 ноября 2018

Я полагаю, что ответ заключается в том, что, блокируя то, как работает ваш код, вы фактически избегаете переполнения потоков и кэша, что делает его более быстрым, поскольку цикл для каждого потока может завершаться без какого-либо другого конфликта аппаратных ресурсов.На самом деле это не сравнение яблок и яблок, но, переместив блокировку в цикл, а не за его пределы:

def inc_ref(self):
    time.sleep(0.1)
    for i in xrange(0, self.count_tot):
        self.lock.acquire()
        ThreadUnsafeClass.ref += 1
        self.lock.release()

def dec_ref(self):
    time.sleep(0.1)
    for i in xrange(0, self.count_tot):
        self.lock.acquire()
        ThreadUnsafeClass.ref -= 1
        self.lock.release()

Я обнаружил, что время выполнения резко увеличилось (как вы и ожидали).

Для дальнейшего тестирования этой теории я взял ваш код и добавил несколько более подробных временных параметров, чтобы точно определить, сколько времени потребовалось операциям увеличения / уменьшения по сравнению с блокировкой:

import threading
import time
import operator

class ThreadUnsafeClass(object):
    ref = 0

    def __init__(self, count_tot=10000):
        self.all_threads = []
        self.count_tot = count_tot
        ThreadUnsafeClass.ref = 0

    def inc_ref(self, ndx):
        time.sleep(0.1)
        ref_time = 0
        for i in xrange(0, self.count_tot):
            op_start = time.time()
            ThreadUnsafeClass.ref += 1
            ref_time += time.time() - op_start
        self.op_times[ndx] = ref_time

    def dec_ref(self, ndx):
        time.sleep(0.1)
        ref_time = 0
        for i in xrange(0, self.count_tot):
            op_start = time.time()
            ThreadUnsafeClass.ref -= 1
            ref_time += time.time() - op_start
        self.op_times[ndx] = ref_time


    def compute_ref_value(self):
        start_time = time.time()
        self.op_times = [0]*100
        for i in xrange(0, 50):
            t1 = threading.Thread(target=self.inc_ref, args=(i*2,))
            t2 = threading.Thread(target=self.dec_ref, args=(i*2+1,))
            t1.start()
            t2.start()
            self.all_threads.append(t1)
            self.all_threads.append(t2)

        for t in self.all_threads:
            t.join()

        op_total = reduce(operator.add, self.op_times)

        print time.time() - start_time, op_total, " -> ",

        return ThreadUnsafeClass.ref

class ThreadSafeClass(object):
    ref = 0

    def __init__(self, count_tot=10000):
        self.all_threads = []
        self.count_tot = count_tot
        ThreadUnsafeClass.ref = 0
        self.lock = threading.Lock()

    def inc_ref(self, ndx):
        time.sleep(0.1)
        lock_start = time.time()
        self.lock.acquire()
        lock_time = time.time() - lock_start
        ref_time = 0
        for i in xrange(0, self.count_tot):
            op_start = time.time()
            ThreadUnsafeClass.ref += 1
            ref_time += time.time() - op_start
        self.lock.release()
        self.op_times[ndx] = ref_time
        self.lock_times[ndx] = lock_time

    def dec_ref(self, ndx):
        time.sleep(0.1)
        lock_start = time.time()
        self.lock.acquire()
        lock_time = time.time() - lock_start
        ref_time = 0
        for i in xrange(0, self.count_tot):
            op_start = time.time()
            ThreadUnsafeClass.ref -= 1
            ref_time += time.time() - op_start
        self.lock.release()
        self.op_times[ndx] = ref_time
        self.lock_times[ndx] = lock_time


    def compute_ref_value(self):
        start_time = time.time()
        self.op_times = [0]*100
        self.lock_times = [0]*100
        for i in xrange(0, 50):
            t1 = threading.Thread(target=self.inc_ref, args=(i*2,))
            t2 = threading.Thread(target=self.dec_ref, args=(i*2+1,))
            t1.start()
            t2.start()
            self.all_threads.append(t1)
            self.all_threads.append(t2)

        for t in self.all_threads:
            t.join()

        op_total = reduce(operator.add, self.op_times)
        lock_total = reduce(operator.add, self.lock_times)

        print time.time() - start_time, op_total, lock_total, " -> ",

        return ThreadUnsafeClass.ref

thread_unsafe_class = ThreadUnsafeClass(100000)
print "Value from un-safe threading ", thread_unsafe_class.compute_ref_value()

thread_safe_class = ThreadSafeClass(100000)
print "Value from safe threading ", thread_safe_class.compute_ref_value()

Вывод был:

Value from un-safe threading  6.93944501877 297.449399471  ->  13057
Value from safe threading  4.08318996429 2.6313662529 197.359120131  ->  0

Показывает, что кумулятивное время только для приращения и уменьшения (по всем потокам) составило почти 300 секунд для случая без блокировки, но менее 3 секунд для случая блокировки.Для случая блокировки действительно потребовалось почти 200 (кумулятивных) секунд, чтобы получить блокировку для всех потоков, но общее время блокировки и приращения / уменьшения в этом случае все еще меньше.

Избиение происходит из-за того, что при совместном использованиипамять, доступ к которой осуществляется несколькими потоками, работающими на нескольких процессорах (как это имеет почти каждая система в наши дни), аппаратное обеспечение должно координировать доступ к этой общей памяти между каждым процессором, а также при наличии нескольких повторных обращений к одной и той же памяти (или памяти)в одной и той же строке кэша) одновременно из разных источников, процессоры тратят нетривиальное количество времени на ожидание друг друга.

Когда вы вводите блокировку, вы тратите время на ожидание блокировки, но внутриБлокировка каждого потока / ЦП имеет эксклюзивный доступ к общей памяти, поэтому нет дополнительных затрат на координацию одновременного доступа из нескольких ЦП.

...