Я скребу в Интернете, загружаю изображения, обрабатываю их, все время записываю все это в MySQL. Я использую SyncManager для создания Сервера и создания в нем присоединяемых очередей.
У меня есть скрипт для сервера, скребка, загрузчика, процессора изображений и процессора SQL. У каждого из них есть своя отдельная присоединяемая очередь. Я запускаю все сценарии одновременно (несколько экземпляров каждого - для скорости), и мне нужно синхронизировать все их выходные данные, поэтому я использую join () и task_done (), где это необходимо.
У меня следующий вопрос: предположим, я запускаю 10 экземпляров каждого скрипта. (Используя Downloader в качестве примера :) Ожидает ли join () одного экземпляра Downloader для элементов, которые этот экземпляр - и только этот экземпляр - помещает в очередь ImageProcessor ? (в то время как другие загрузчики также загружают очередь imgProc) Я бы ожидал, но я хочу быть уверен.
Я пытаюсь проверить это, но мне трудно увидеть все результаты. поэтому я подумал, что могу использовать подтверждение эксперта (я новичок в многопроцессорной обработке). Спасибо!
Вот код:
Вот так выглядит мой сервер:
from multiprocessing.managers import SyncManager
from multiprocessing import JoinableQueue
class QueueManager(SyncManager):
pass
class Server(Process):
def __init__(self, ip, port, authkey):
super(Server, self).__init__()
self.ip = ip
self.port = port
self.authkey = authkey
def make_server_manager(self):
q_search = JoinableQueue() #input for the scraper
q_imgProc = JoinableQueue() #input for the Image Processor
q_sql = JoinableQueue() #input queue to the sqlServer
q_download = JoinableQueue() #queue for the Downloader
QueueManager.register('q_search', callable=lambda: q_search)
QueueManager.register('q_imgProc', callable=lambda: q_imgProc)
QueueManager.register('q_sql', callable=lambda: q_sql)
QueueManager.register('q_download', callable=lambda: q_download)
QueueManager.register('rootFolder', callable = lambda:"some dir")
manager = QueueManager(address=(self.ip, self.port), authkey=self.authkey)
print('Server started at port ' + str(self.port))
return manager
if __name__ == "__main__":
ip = '127.0.0.1'
port = 24487
authkey = b'abc'
s = Server(ip,port,authkey)
manager = s.make_server_manager()
s = manager.get_server()
s.serve_forever()
Вот как выглядят другие скрипты (т.е. Загрузчик):
from multiprocessing.managers import SyncManager
class QueueManager(SyncManager):
pass
class Downloader():
def __init__(self,ip,port,authkey):
[Initialize all the variables as they are defined in the Server]
def SomeFctThatDoesStuff(self,...):
[work]
#send sth to the Image Processor
For image in images:
self.q_imgProc.put('message')
self.q_imgProc.join()
if __name__ == "__main__":
ip = '127.0.0.1'
port = 24487
authkey = b'abc'
downloader = Downloader(ip, port, authkey)
while True:
rcvd = downloader.q_download.get()
downloader.q_download.task_done()
Очевидно, что imgProc и все соответствующие другие процессы также имеют task_done()
в конце.
(питон 3,6)