0mq соединение один-ко-многим - PullRequest
4 голосов
/ 24 августа 2011

Какой самый правильный способ установить двустороннюю связь между процессами, используя 0mq?Мне нужно создать несколько фоновых процессов, которые будут ждать команды от основного процесса, выполнять некоторые вычисления и возвращать результат обратно в основной процесс.

Ответы [ 2 ]

7 голосов
/ 24 августа 2011

Есть несколько способов сделать это. Наиболее простой подход - использовать сокеты REQ / REP. Каждый фоновый процесс / рабочий будет иметь сокет REP, и вы будете использовать сокет REQ для связи с ними:

import zmq

def worker(addr):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(addr)
    while True:
        # get message from boss
        msg = socket.recv()
        # ...do smth
        # send back results
        socket.send(msg)

if __name__ == '__main__':
    # spawn 5 workers
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()

Вам нужно будет подключиться к каждому работнику, чтобы отправить ему сообщение и получить результаты:

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()

Другой подход заключается в использовании PUB / SUB для отправки сообщений рабочим и PUSH / PULL для сбора результатов:

import zmq

def worker(worker_id, publisher_addr, results_addr):
    context = zmq.Context()
    sub = context.socket(zmq.SUB)
    sub.connect(publisher_addr)
    sub.setsockopt(zmq.SUBSCRIBE, worker_id)
    push = context.socket(zmq.PUSH)
    push.connect(results_addr)

    while True:
        msg = sub.recv_multipart()[1]
        # do smth, send off results
        push.send_multipart([worker_id, msg])

if __name__ == '__main__':
    publisher_addr = 'tcp://127.0.0.1:5000'
    results_addr = 'tcp://127.0.0.1:5001'

    # launch some workers into space
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()

Чтобы передать команду конкретному работнику, вы должны сделать что-то вроде:

context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])

Результаты поиска:

context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)

while True:
    worker_id, result = pull.recv_multipart()
    print worker_id, result
3 голосов
/ 24 августа 2011

Попробуйте использовать Запрос брокера ответов , но обменяйте сокет REQ на ДИЛЕРА.Дилер не блокирует отправку и автоматически загружает баланс трафика на ваших работников.

На картинке Client будет вашим main process, а Service A/B/C вашим background processes (workers).Main process должен связываться с конечной точкой.Workers должен подключаться к конечной точке основного процесса для получения рабочих элементов.

В main process ведется список рабочих элементов и время отправки.Если в течение некоторого времени ответа нет, просто повторно отправьте рабочий элемент, поскольку worker вероятно умер.

Request Reply Broker

...