Многопроцессорная обработка Python: хорошо ли работает join () / task_done () с параллельными процессами? - PullRequest
0 голосов
/ 08 ноября 2018

Я скребу в Интернете, загружаю изображения, обрабатываю их, все время записываю все это в MySQL. Я использую SyncManager для создания Сервера и создания в нем присоединяемых очередей.

У меня есть скрипт для сервера, скребка, загрузчика, процессора изображений и процессора SQL. У каждого из них есть своя отдельная присоединяемая очередь. Я запускаю все сценарии одновременно (несколько экземпляров каждого - для скорости), и мне нужно синхронизировать все их выходные данные, поэтому я использую join () и task_done (), где это необходимо.

У меня следующий вопрос: предположим, я запускаю 10 экземпляров каждого скрипта. (Используя Downloader в качестве примера :) Ожидает ли join () одного экземпляра Downloader для элементов, которые этот экземпляр - и только этот экземпляр - помещает в очередь ImageProcessor ? (в то время как другие загрузчики также загружают очередь imgProc) Я бы ожидал, но я хочу быть уверен.

Я пытаюсь проверить это, но мне трудно увидеть все результаты. поэтому я подумал, что могу использовать подтверждение эксперта (я новичок в многопроцессорной обработке). Спасибо!

Вот код:
Вот так выглядит мой сервер:

from multiprocessing.managers import SyncManager
from multiprocessing import JoinableQueue

class QueueManager(SyncManager):
    pass

class Server(Process):
    def __init__(self, ip, port, authkey):
        super(Server, self).__init__()
        self.ip = ip
        self.port = port
        self.authkey = authkey

    def make_server_manager(self):
        q_search = JoinableQueue() #input for the scraper
        q_imgProc = JoinableQueue() #input for the Image Processor
        q_sql = JoinableQueue() #input queue to the sqlServer
        q_download = JoinableQueue() #queue for the Downloader
        QueueManager.register('q_search', callable=lambda: q_search)
        QueueManager.register('q_imgProc', callable=lambda: q_imgProc)
        QueueManager.register('q_sql', callable=lambda: q_sql)
        QueueManager.register('q_download', callable=lambda: q_download)
        QueueManager.register('rootFolder', callable = lambda:"some dir")
        manager = QueueManager(address=(self.ip, self.port), authkey=self.authkey)
        print('Server started at port ' + str(self.port))
        return manager

if __name__ == "__main__":
    ip = '127.0.0.1'
    port = 24487
    authkey = b'abc'

    s = Server(ip,port,authkey)
    manager = s.make_server_manager()
    s = manager.get_server()
    s.serve_forever()

Вот как выглядят другие скрипты (т.е. Загрузчик):

from multiprocessing.managers import SyncManager

class QueueManager(SyncManager):
    pass

class Downloader():

    def __init__(self,ip,port,authkey):
        [Initialize all the variables as they are defined in the Server]

     def SomeFctThatDoesStuff(self,...):
         [work]

         #send sth to the Image Processor
         For image in images:
             self.q_imgProc.put('message')
         self.q_imgProc.join()

if __name__ == "__main__":
    ip = '127.0.0.1'
    port = 24487
    authkey = b'abc'
    downloader = Downloader(ip, port, authkey)

    while True:
        rcvd = downloader.q_download.get()
        downloader.q_download.task_done()

Очевидно, что imgProc и все соответствующие другие процессы также имеют task_done() в конце. (питон 3,6)

...