Многопроцессорная обработка с обновляемой очередью - PullRequest
2 голосов
/ 24 августа 2009

Я пытаюсь выяснить, как написать программу на python, которая использует многопроцессорную очередь.

У меня есть несколько серверов, и один из них удаленно предоставит очередь следующим образом:

from multiprocessing.managers import BaseManager
import Queue
import daemonme

queue = Queue.Queue()

class QueueManager(BaseManager):
    pass

daemonme.createDaemon()
QueueManager.register('get_job', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey='')
s = m.get_server()
s.serve_forever()

Теперь я хочу использовать мой четырехъядерный сервер Xeon для обработки заданий из этой удаленной очереди. Работа полностью независима друг от друга. Поэтому, если у меня 8 ядер, я бы хотел запустить 7 процессов, которые выбирают задание из очереди, обрабатывают его, а затем возвращаются к следующему. Каждый из 7 процессов будет делать это, но я не могу полностью сосредоточиться на структуре этой программы.

Может ли кто-нибудь дать мне некоторые образованные идеи об основной структуре этого?

Заранее спасибо.

Ответы [ 2 ]

2 голосов
/ 24 августа 2009

Посмотрите в документе, как получить очередь из менеджера (пункт 17.6.2.7) чем при пуле (п. 17.6.2.9) работников запустить 7 заданий, передавая очередь в каждую.

в качестве альтернативы вы можете подумать о проблеме производителя / потребителя:

from multiprocessing.managers import BaseManager
import random

class Producer():
def __init__(self):
    BaseManager.register('queue')
    self.m = BaseManager(address=('hostname', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.cm_queue = self.m.queue()
    while 1:
        time.sleep(random.randint(1,3))
        self.cm_queue.put(<PUT-HERE-JOBS>)

from multiprocessing.managers import BaseManager
import time
import random
class Consumer():
def __init__(self):
    BaseManager.register('queue')

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.queue = self.m.queue()
    while 1:
        <EXECUTE(job = self.queue.get())>


from multiprocessing.managers import BaseManager, Queue
class Manager():

def __init__(self):

    self.queue = QueueQueu()

    BaseManager.register('st_queue', callable=lambda:self.queue)

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.s = self.m.get_server()

    self.s.serve_forever()
0 голосов
/ 24 августа 2009

Вы должны использовать шаблон «ведущий-ведомый» (он же фермер-рабочий). Первоначальный процесс будет основным и создает рабочие места. Это

  1. создает очередь
  2. создает 7 подчиненных процессов, передавая очередь в качестве параметра
  3. начинает запись заданий в очередь

Подчиненные процессы постоянно читают из очереди и выполняют задания (возможно, пока не получат сообщение остановки из очереди). В этом сценарии нет необходимости использовать объекты Manager, AFAICT.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...