Я полагаю, что ответ заключается в том, что, блокируя то, как работает ваш код, вы фактически избегаете переполнения потоков и кэша, что делает его более быстрым, поскольку цикл для каждого потока может завершаться без какого-либо другого конфликта аппаратных ресурсов.На самом деле это не сравнение яблок и яблок, но, переместив блокировку в цикл, а не за его пределы:
def inc_ref(self):
time.sleep(0.1)
for i in xrange(0, self.count_tot):
self.lock.acquire()
ThreadUnsafeClass.ref += 1
self.lock.release()
def dec_ref(self):
time.sleep(0.1)
for i in xrange(0, self.count_tot):
self.lock.acquire()
ThreadUnsafeClass.ref -= 1
self.lock.release()
Я обнаружил, что время выполнения резко увеличилось (как вы и ожидали).
Для дальнейшего тестирования этой теории я взял ваш код и добавил несколько более подробных временных параметров, чтобы точно определить, сколько времени потребовалось операциям увеличения / уменьшения по сравнению с блокировкой:
import threading
import time
import operator
class ThreadUnsafeClass(object):
ref = 0
def __init__(self, count_tot=10000):
self.all_threads = []
self.count_tot = count_tot
ThreadUnsafeClass.ref = 0
def inc_ref(self, ndx):
time.sleep(0.1)
ref_time = 0
for i in xrange(0, self.count_tot):
op_start = time.time()
ThreadUnsafeClass.ref += 1
ref_time += time.time() - op_start
self.op_times[ndx] = ref_time
def dec_ref(self, ndx):
time.sleep(0.1)
ref_time = 0
for i in xrange(0, self.count_tot):
op_start = time.time()
ThreadUnsafeClass.ref -= 1
ref_time += time.time() - op_start
self.op_times[ndx] = ref_time
def compute_ref_value(self):
start_time = time.time()
self.op_times = [0]*100
for i in xrange(0, 50):
t1 = threading.Thread(target=self.inc_ref, args=(i*2,))
t2 = threading.Thread(target=self.dec_ref, args=(i*2+1,))
t1.start()
t2.start()
self.all_threads.append(t1)
self.all_threads.append(t2)
for t in self.all_threads:
t.join()
op_total = reduce(operator.add, self.op_times)
print time.time() - start_time, op_total, " -> ",
return ThreadUnsafeClass.ref
class ThreadSafeClass(object):
ref = 0
def __init__(self, count_tot=10000):
self.all_threads = []
self.count_tot = count_tot
ThreadUnsafeClass.ref = 0
self.lock = threading.Lock()
def inc_ref(self, ndx):
time.sleep(0.1)
lock_start = time.time()
self.lock.acquire()
lock_time = time.time() - lock_start
ref_time = 0
for i in xrange(0, self.count_tot):
op_start = time.time()
ThreadUnsafeClass.ref += 1
ref_time += time.time() - op_start
self.lock.release()
self.op_times[ndx] = ref_time
self.lock_times[ndx] = lock_time
def dec_ref(self, ndx):
time.sleep(0.1)
lock_start = time.time()
self.lock.acquire()
lock_time = time.time() - lock_start
ref_time = 0
for i in xrange(0, self.count_tot):
op_start = time.time()
ThreadUnsafeClass.ref -= 1
ref_time += time.time() - op_start
self.lock.release()
self.op_times[ndx] = ref_time
self.lock_times[ndx] = lock_time
def compute_ref_value(self):
start_time = time.time()
self.op_times = [0]*100
self.lock_times = [0]*100
for i in xrange(0, 50):
t1 = threading.Thread(target=self.inc_ref, args=(i*2,))
t2 = threading.Thread(target=self.dec_ref, args=(i*2+1,))
t1.start()
t2.start()
self.all_threads.append(t1)
self.all_threads.append(t2)
for t in self.all_threads:
t.join()
op_total = reduce(operator.add, self.op_times)
lock_total = reduce(operator.add, self.lock_times)
print time.time() - start_time, op_total, lock_total, " -> ",
return ThreadUnsafeClass.ref
thread_unsafe_class = ThreadUnsafeClass(100000)
print "Value from un-safe threading ", thread_unsafe_class.compute_ref_value()
thread_safe_class = ThreadSafeClass(100000)
print "Value from safe threading ", thread_safe_class.compute_ref_value()
Вывод был:
Value from un-safe threading 6.93944501877 297.449399471 -> 13057
Value from safe threading 4.08318996429 2.6313662529 197.359120131 -> 0
Показывает, что кумулятивное время только для приращения и уменьшения (по всем потокам) составило почти 300 секунд для случая без блокировки, но менее 3 секунд для случая блокировки.Для случая блокировки действительно потребовалось почти 200 (кумулятивных) секунд, чтобы получить блокировку для всех потоков, но общее время блокировки и приращения / уменьшения в этом случае все еще меньше.
Избиение происходит из-за того, что при совместном использованиипамять, доступ к которой осуществляется несколькими потоками, работающими на нескольких процессорах (как это имеет почти каждая система в наши дни), аппаратное обеспечение должно координировать доступ к этой общей памяти между каждым процессором, а также при наличии нескольких повторных обращений к одной и той же памяти (или памяти)в одной и той же строке кэша) одновременно из разных источников, процессоры тратят нетривиальное количество времени на ожидание друг друга.
Когда вы вводите блокировку, вы тратите время на ожидание блокировки, но внутриБлокировка каждого потока / ЦП имеет эксклюзивный доступ к общей памяти, поэтому нет дополнительных затрат на координацию одновременного доступа из нескольких ЦП.