У меня есть приложение с привязкой к процессору, которое я хочу ускорить, используя многопроцессорную обработку + многопоточность вместо использования чистой версии с многопоточностью. Я написал простое приложение для проверки производительности моего подхода и с удивлением увидел, что многопроцессорная и многопроцессорная + многопоточные версии работают хуже, чем как многопоточная, так и последовательная версии.
В моем приложении есть рабочая очередь, в которой хранится вся работа. Затем потоки выскакивают по одному рабочему элементу за раз, а затем обрабатывают его либо напрямую (многопоточная версия), либо передавая его в процесс. Затем поток должен дождаться получения результата, прежде чем перейти к следующей итерации. Причина, по которой мне нужно извлекать по одному рабочему элементу за раз, заключается в том, что работа является динамической (это не относится к коду приложения-прототипа, вставленному ниже), и я не могу предварительно разделить работу и передать ее каждому потоку / процессу во время создания .
Я хотел бы знать, что я делаю неправильно и как я могу ускорить свое приложение.
Вот время выполнения, когда я работал на 16-ядерном компьютере:
Version : 2.7.2
Compiler : GCC 4.1.2 20070925 (Red Hat 4.1.2-33)
Platform : Linux-2.6.24-perfctr-x86_64-with-fedora-8-Werewolf
Processor : x86_64
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 3505.97214699 ms
mainPureMultiprocessing exec time: 2241.89805984 ms
mainPureThreaded exec time: 309.767007828 ms
mainSerial exec time: 52.3412227631 ms
Terminating
и вот код, который я использовал:
import threading
import multiprocessing
import time
import platform
class ConcurrentQueue:
def __init__(self):
self.data = []
self.lock = threading.Lock()
def push(self, item):
self.lock.acquire()
try:
self.data.append(item)
finally:
self.lock.release()
return
def pop(self):
self.lock.acquire()
result = None
try:
length = len(self.data)
if length > 0:
result = self.data.pop()
finally:
self.lock.release()
return result
def isEmpty(self, item):
self.lock.acquire()
result = 0
try:
result = len(self.data)
finally:
self.lock.release()
return result != 0
def timeFunc(passedFunc):
def wrapperFunc(*arg):
startTime = time.time()
result = passedFunc(*arg)
endTime = time.time()
elapsedTime = (endTime - startTime) * 1000
print passedFunc.__name__, 'exec time:', elapsedTime, " ms"
return result
return wrapperFunc
def checkPrime(candidate):
# dummy process to do some work
for k in xrange(3, candidate, 2):
if candidate % k:
return False
return True
def fillQueueWithWork(itemQueue, numItems):
for item in xrange(numItems, 2 * numItems):
itemQueue.push(item)
@timeFunc
def mainSerial(numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
while True:
dataItem = jobQueue.pop()
if dataItem is None:
break
# do work with dataItem
result = checkPrime(dataItem)
return
# Start: Implement a pure threaded version
def pureThreadFunc(jobQueue):
curThread = threading.currentThread()
while True:
dataItem = jobQueue.pop()
if dataItem is None:
break
# do work with dataItem
result = checkPrime(dataItem)
return
@timeFunc
def mainPureThreaded(numThreads, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
workers = []
for index in xrange(numThreads):
loopName = "Thread-" + str(index)
loopThread = threading.Thread(target=pureThreadFunc, name=loopName, args=(jobQueue, ))
loopThread.start()
workers.append(loopThread)
for worker in workers:
worker.join()
return
# End: Implement a pure threaded version
# Start: Implement a pure multiprocessing version
def pureMultiprocessingFunc(jobQueue, resultQueue):
while True:
dataItem = jobQueue.get()
if dataItem is None:
break
# do work with dataItem
result = checkPrime(dataItem)
resultQueue.put_nowait(result)
return
@timeFunc
def mainPureMultiprocessing(numProcesses, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
workers = []
queueSize = (numItems/numProcesses) + 10
for index in xrange(numProcesses):
jobs = multiprocessing.Queue(queueSize)
results = multiprocessing.Queue(queueSize)
loopProcess = multiprocessing.Process(target=pureMultiprocessingFunc, args=(jobs, results, ))
loopProcess.start()
workers.append((loopProcess, jobs, results))
processIndex = 0
while True:
dataItem = jobQueue.pop()
if dataItem is None:
break
workers[processIndex][1].put_nowait(dataItem)
processIndex += 1
if numProcesses == processIndex:
processIndex = 0
for worker in workers:
worker[1].put_nowait(None)
for worker in workers:
worker[0].join()
return
# End: Implement a pure multiprocessing version
# Start: Implement a threaded+multiprocessing version
def mpFunc(processName, jobQueue, resultQueue):
while True:
dataItem = jobQueue.get()
if dataItem is None:
break
result = checkPrime(dataItem)
resultQueue.put_nowait(result)
return
def mpThreadFunc(jobQueue):
curThread = threading.currentThread()
threadName = curThread.getName()
jobs = multiprocessing.Queue()
results = multiprocessing.Queue()
myProcessName = "Process-" + threadName
myProcess = multiprocessing.Process(target=mpFunc, args=(myProcessName, jobs, results, ))
myProcess.start()
while True:
dataItem = jobQueue.pop()
# put item to allow process to start
jobs.put_nowait(dataItem)
# terminate loop if work queue is empty
if dataItem is None:
break
# wait to get result from process
result = results.get()
# do something with result
return
@timeFunc
def mainMultiprocessAndThreaded(numThreads, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
workers = []
for index in xrange(numThreads):
loopName = "Thread-" + str(index)
loopThread = threading.Thread(target=mpThreadFunc, name=loopName, args=(jobQueue, ))
loopThread.start()
workers.append(loopThread)
for worker in workers:
worker.join()
return
# End: Implement a threaded+multiprocessing version
if __name__ == '__main__':
print 'Version :', platform.python_version()
print 'Compiler :', platform.python_compiler()
print 'Platform :', platform.platform()
print 'Processor :', platform.processor()
numThreads = 8
numItems = 16000 #200000
print "Num Threads/Processes:", numThreads, "; Num Items:", numItems
mainMultiprocessAndThreaded(numThreads, numItems)
mainPureMultiprocessing(numThreads, numItems)
mainPureThreaded(numThreads, numItems)
mainSerial(numItems)
print "Terminating"
Редактировать: Одно из моих предположений о медлительности заключается в том, что Queue.put () заняты ожиданием вместо отказа от GIL. Если да, то какие предложения по альтернативной структуре данных я должен использовать?