Проблемы с ускорением приложения с помощью Multiprocessing + Threads в Python - PullRequest
2 голосов
/ 22 января 2012

У меня есть приложение с привязкой к процессору, которое я хочу ускорить, используя многопроцессорную обработку + многопоточность вместо использования чистой версии с многопоточностью. Я написал простое приложение для проверки производительности моего подхода и с удивлением увидел, что многопроцессорная и многопроцессорная + многопоточные версии работают хуже, чем как многопоточная, так и последовательная версии.

В моем приложении есть рабочая очередь, в которой хранится вся работа. Затем потоки выскакивают по одному рабочему элементу за раз, а затем обрабатывают его либо напрямую (многопоточная версия), либо передавая его в процесс. Затем поток должен дождаться получения результата, прежде чем перейти к следующей итерации. Причина, по которой мне нужно извлекать по одному рабочему элементу за раз, заключается в том, что работа является динамической (это не относится к коду приложения-прототипа, вставленному ниже), и я не могу предварительно разделить работу и передать ее каждому потоку / процессу во время создания .

Я хотел бы знать, что я делаю неправильно и как я могу ускорить свое приложение.

Вот время выполнения, когда я работал на 16-ядерном компьютере:

Version      : 2.7.2
Compiler     : GCC 4.1.2 20070925 (Red Hat 4.1.2-33)
Platform     : Linux-2.6.24-perfctr-x86_64-with-fedora-8-Werewolf
Processor    : x86_64
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 3505.97214699  ms
mainPureMultiprocessing exec time: 2241.89805984  ms
mainPureThreaded exec time: 309.767007828  ms
mainSerial exec time: 52.3412227631  ms
Terminating

и вот код, который я использовал:

import threading
import multiprocessing
import time
import platform

class ConcurrentQueue:
    def __init__(self):
        self.data = []
        self.lock = threading.Lock()

    def push(self, item):
        self.lock.acquire()
        try:
            self.data.append(item)
        finally:
            self.lock.release()
        return

    def pop(self):
        self.lock.acquire()
        result = None
        try:
            length = len(self.data)
            if length > 0:
                result = self.data.pop()
        finally:
            self.lock.release()
        return result

    def isEmpty(self, item):
        self.lock.acquire()
        result = 0
        try:
            result = len(self.data)
        finally:
            self.lock.release()
        return result != 0


def timeFunc(passedFunc):
    def wrapperFunc(*arg):
        startTime = time.time()
        result = passedFunc(*arg)
        endTime = time.time()
        elapsedTime = (endTime - startTime) * 1000
        print passedFunc.__name__, 'exec time:', elapsedTime, " ms"
        return result
    return wrapperFunc

def checkPrime(candidate):
    # dummy process to do some work
    for k in xrange(3, candidate, 2):
        if candidate % k:
            return False
    return True

def fillQueueWithWork(itemQueue, numItems):
    for item in xrange(numItems, 2 * numItems):
        itemQueue.push(item)


@timeFunc
def mainSerial(numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    while True:
        dataItem = jobQueue.pop()
        if dataItem is None:
            break
        # do work with dataItem
        result = checkPrime(dataItem)
    return

# Start: Implement a pure threaded version
def pureThreadFunc(jobQueue):
    curThread = threading.currentThread()
    while True:
        dataItem = jobQueue.pop()
        if dataItem is None:
            break
        # do work with dataItem
        result = checkPrime(dataItem)
    return

@timeFunc
def mainPureThreaded(numThreads, numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    workers = []
    for index in xrange(numThreads):
        loopName = "Thread-" + str(index)
        loopThread = threading.Thread(target=pureThreadFunc, name=loopName, args=(jobQueue, ))
        loopThread.start()
        workers.append(loopThread)

    for worker in workers:
        worker.join()

    return
# End: Implement a pure threaded version

# Start: Implement a pure multiprocessing version
def pureMultiprocessingFunc(jobQueue, resultQueue):
    while True:
        dataItem = jobQueue.get()
        if dataItem is None:
            break
        # do work with dataItem
        result = checkPrime(dataItem)
        resultQueue.put_nowait(result)
    return

@timeFunc
def mainPureMultiprocessing(numProcesses, numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    workers = []
    queueSize = (numItems/numProcesses) + 10
    for index in xrange(numProcesses):
        jobs = multiprocessing.Queue(queueSize)
        results = multiprocessing.Queue(queueSize)
        loopProcess = multiprocessing.Process(target=pureMultiprocessingFunc, args=(jobs, results, ))
        loopProcess.start()
        workers.append((loopProcess, jobs, results))

    processIndex = 0
    while True:
        dataItem = jobQueue.pop()
        if dataItem is None:
            break
        workers[processIndex][1].put_nowait(dataItem)

        processIndex += 1
        if numProcesses == processIndex:
            processIndex = 0

    for worker in workers:
        worker[1].put_nowait(None)

    for worker in workers:
        worker[0].join()

    return
# End: Implement a pure multiprocessing version

# Start: Implement a threaded+multiprocessing version
def mpFunc(processName, jobQueue, resultQueue):
    while True:
        dataItem = jobQueue.get()
        if dataItem is None:
            break
        result = checkPrime(dataItem)
        resultQueue.put_nowait(result)
    return

def mpThreadFunc(jobQueue):
    curThread = threading.currentThread()
    threadName = curThread.getName()

    jobs = multiprocessing.Queue()
    results = multiprocessing.Queue()

    myProcessName = "Process-" + threadName
    myProcess = multiprocessing.Process(target=mpFunc, args=(myProcessName, jobs, results, ))
    myProcess.start()

    while True:
        dataItem = jobQueue.pop()
        # put item to allow process to start
        jobs.put_nowait(dataItem)
        # terminate loop if work queue is empty
        if dataItem is None:
            break
        # wait to get result from process
        result = results.get()
        # do something with result
    return

@timeFunc
def mainMultiprocessAndThreaded(numThreads, numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    workers = []
    for index in xrange(numThreads):
        loopName = "Thread-" + str(index)
        loopThread = threading.Thread(target=mpThreadFunc, name=loopName, args=(jobQueue, ))
        loopThread.start()
        workers.append(loopThread)

    for worker in workers:
        worker.join()

    return
# End: Implement a threaded+multiprocessing version

if __name__ == '__main__':

    print 'Version      :', platform.python_version()
    print 'Compiler     :', platform.python_compiler()
    print 'Platform     :', platform.platform()
    print 'Processor    :', platform.processor()

    numThreads = 8
    numItems = 16000 #200000

    print "Num Threads/Processes:", numThreads, "; Num Items:", numItems

    mainMultiprocessAndThreaded(numThreads, numItems)
    mainPureMultiprocessing(numThreads, numItems)
    mainPureThreaded(numThreads, numItems)
    mainSerial(numItems)

    print "Terminating"

Редактировать: Одно из моих предположений о медлительности заключается в том, что Queue.put () заняты ожиданием вместо отказа от GIL. Если да, то какие предложения по альтернативной структуре данных я должен использовать?

Ответы [ 2 ]

5 голосов
/ 22 января 2012

Похоже, вычислительные затраты каждого элемента не перевешивают накладные расходы, связанные с отправкой работы другому потоку / процессу.Например, вот результаты, которые я вижу, когда запускаю тестовое приложение на моей машине (очень похоже на ваши результаты):

Version      : 2.7.1
Compiler     : MSC v.1500 32 bit (Intel)
Platform     : Windows-7-6.1.7601-SP1
Processor    : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 1134.00006294  ms
mainPureMultiprocessing exec time: 917.000055313  ms
mainPureThreaded exec time: 111.000061035  ms
mainSerial exec time: 41.0001277924  ms
Terminating

Если я изменю выполняемую работу на что-то более дорого вычислительноеНапример:

def checkPrime(candidate):
    i = 0;
    for k in xrange(1,10000):
        i += k
    return i < 5000

Затем я вижу результаты, которые в большей степени соответствуют ожидаемым результатам:

Version      : 2.7.1
Compiler     : MSC v.1500 32 bit (Intel)
Platform     : Windows-7-6.1.7601-SP1
Processor    : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 2190.99998474  ms
mainPureMultiprocessing exec time: 2154.99997139  ms
mainPureThreaded exec time: 16170.0000763  ms
mainSerial exec time: 9143.00012589  ms
Terminating

Возможно, вы также захотите взглянуть на multiprocessing.Pool.Он обеспечивает модель, аналогичную описанной вами (несколько рабочих процессов извлекают задания из общей очереди).Для вашего примера реализация может выглядеть примерно так:

@timeFunc
def mainPool(numThreads, numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    pool = multiprocessing.Pool(processes=numThreads)
    results = []
    while True:
        dataItem = jobQueue.pop()
        if dataItem == None:
            break
        results.append(pool.apply_async(checkPrime, dataItem))

    pool.close()
    pool.join()

На моей машине с альтернативной реализацией checkPrime я вижу результат:

Version      : 2.7.1
Compiler     : MSC v.1500 32 bit (Intel)
Platform     : Windows-7-6.1.7601-SP1
Processor    : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 1600
mainPool exec time: 1530.99989891  ms
Terminating

Так какmultiprocessing.Pool уже обеспечивает безопасный доступ для вставки работы, вы, вероятно, можете исключить вашу ConcurrentQueue и вставить свою динамическую работу непосредственно в Pool.

2 голосов
/ 22 января 2012

Кажется, что ваша функция недостаточно интенсивна в вычислительном отношении, чтобы перевесить издержки многопроцессорной обработки. (Обратите внимание, что в Python MultiThreading не увеличивает ваши вычислительные ресурсы из-за GIL).

Ваша функция (checkPrime) на самом деле не проверяет простоту, скорее она возвращается очень быстро, заменяя ее простой (и наивной) простейшей проверкой, результат такой, как и ожидалось.

Однако посмотрите на . Используйте Python pool.map, чтобы несколько процессов выполняли операции со списком , чтобы увидеть простое использование многопроцессорной обработки. Обратите внимание, что есть встроенные типы для выполнения задачи вашей очереди, такие как очередь, см. http://docs.python.org/library/multiprocessing.html#multiprocessing-managers

def checkPrime(candidate):
    # dummy process to do some work
    for k in xrange(3, candidate):
        if not candidate % k:
            return False
    return True

и пример «быстрой» реализации:

@timeFunc
def speedy(numThreads,numItems):
    pool = multiprocessing.Pool(numThreads) #note the default will use the optimal number of workers

    for i in xrange(numItems, 2 * numItems):
        pool.apply_async(checkPrime,i)
    pool.close()
    pool.join()

Что почти в два раза быстрее!

wdolphin@Cory-linuxlaptop:~$ python test.py 
Version      : 2.6.6
Compiler     : GCC 4.4.5
Platform     : Linux-2.6.35-32-generic-x86_64-with-Ubuntu-10.10-maverick
Processor    : 
Num Threads/Processes: 8 ; Num Items: 16000
mainSerial exec time: 5555.76992035  ms
mainMultiprocessAndThreaded exec time: 4721.43602371  ms
mainPureMultiprocessing exec time: 4440.83094597  ms
mainPureThreaded exec time: 10829.3449879  ms
speedy exec time: 1898.72503281  ms
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...