zeromq: как предотвратить бесконечное ожидание? - PullRequest
64 голосов
/ 24 сентября 2011

Я только начал работать с ZMQ. Я разрабатываю приложение, рабочий процесс которого:

  1. один из многих клиентов (имеющих случайные PULL-адреса) отправляет запрос на сервер в 5555
  2. сервер всегда ждет PUSHes клиента. Когда он приходит, рабочий процесс создается для этого конкретного запроса. Да, рабочие процессы могут существовать одновременно.
  3. Когда этот процесс завершает свою задачу, он передает результат клиенту.

Я предполагаю, что архитектура PUSH / PULL подходит для этого. Пожалуйста поправьте меня в этом.


Но как мне справиться с этими сценариями?

  1. client_receiver.recv () будет ждать бесконечное время, когда сервер не сможет ответить.
  2. клиент может отправить запрос, но сразу после этого произойдет сбой, следовательно, рабочий процесс останется на сервере server_sender.send () навсегда.

Так как мне настроить что-то вроде таймаута в модели PUSH / PULL?


РЕДАКТИРОВАТЬ : Спасибо пользователю user938949 за предложения, я получил рабочий ответ и делюсь им для потомков.

Ответы [ 4 ]

71 голосов
/ 24 сентября 2011

Если вы используете zeromq> = 3.0, вы можете установить опцию сокета RCVTIMEO:

client_receiver.RCVTIMEO = 1000 # in milliseconds

Но в целом вы можете использовать опросы:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

И poller.poll() занимает время ожидания:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

evts будет пустым списком, если нечего получать.

Вы можете опросить с помощью zmq.POLLOUT, чтобы проверить, будет ли отправка успешной.

Или, для обработки случая однорангового узла, который мог потерпеть неудачу, a:

worker.send(msg, zmq.NOBLOCK)

может быть достаточно, который всегда будет возвращаться немедленно, вызывая ZMQError (zmq.EAGAIN), если отправка не может быть завершена.

15 голосов
/ 26 сентября 2011

Это был быстрый взлом , который я сделал после того, как сослался на ответ пользователя 938949 и http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/. Если вы делаете лучше, пожалуйста, оставьте свой ответ, Я буду рекомендовать ваш ответ .

Для тех, кто хочет долгосрочных решений по надежности, см. http://zguide.zeromq.org/page:all#toc64

Версия 3.0 zeromq (бета-банкомата) поддерживает время ожидания в ZMQ_RCVTIMEO и ZMQ_SNDTIMEO. http://api.zeromq.org/3-0:zmq-setsockopt

Сервер

zmq.NOBLOCK гарантирует, что когда клиент не существует, send () не блокируется.

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

Клиент

Объект-поллер может прослушивать множество принимающих сокетов (см. Выше «Мультипроцессорную обработку Python с ZeroMQ». Я связал его только с work_receiver . В бесконечном цикле клиент опрашивает с интервалом 1000 мс. socks объект возвращается пустым, если за это время не было получено ни одного сообщения.

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"
8 голосов
/ 01 июня 2012

Блок отправки не будет, если вы используете ZMQ_NOBLOCK, но если вы попытаетесь закрыть сокет и контекст, этот шаг заблокирует выход программы.

Причина в том, что сокет ожидает любого узла, такисходящие сообщения должны быть поставлены в очередь. Чтобы немедленно закрыть сокет и очистить исходящие сообщения из буфера, используйте ZMQ_LINGER и установите его в 0 ..

0 голосов
/ 02 мая 2019

Если вы ожидаете только один сокет, а не создаете Poller, вы можете сделать это:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

Вы можете использовать это, если ваше время ожидания меняется в зависимости от ситуации, вместонастройка work_receiver.RCVTIMEO.

...