ноль mq pub / sub с multipart не работает - PullRequest
5 голосов
/ 05 июня 2011

Вот мой сценарий.


#!/usr/bin/env python

import traceback
import sys
import zmq
from time import sleep

print "Creating the zmq.Context"
context = zmq.Context()

print "Binding the publisher to the local socket at port 5557"
sender = context.socket(zmq.PUB)
sender.bind("tcp://*:5557")

print "Binding the subscriber to the local socket at port 5557"
receiver = context.socket(zmq.SUB)
receiver.connect("tcp://*:5557")

print "Setting the subscriber option to get only those originating from \"B\""
receiver.setsockopt(zmq.SUBSCRIBE, "B")

print "Waiting a second for the socket to be created."
sleep(1)

print "Sending messages"
for i in range(1,10):
    msg = "msg %d" % (i)
    env = None
    if i % 2 == 0:
        env = ["B", msg]
    else:
        env = ["A", msg]
    print "Sending Message:  ", env
    sender.send_multipart(env)

print "Closing the sender."
sender.close()

failed_attempts = 0
while failed_attempts < 3:
    try:
        print str(receiver.recv_multipart(zmq.NOBLOCK))
    except:
        print traceback.format_exception(*sys.exc_info())
        failed_attempts += 1 

print "Closing the receiver."
receiver.close()

print "Terminating the context."
context.term()

"""
Output:

Creating the zmq.Context
Binding the publisher to the local socket at port 5557
Binding the subscriber to the local socket at port 5557
Setting the subscriber option to get only those originating from "B"
Waiting a second for the socket to be created.
Sending messages
Sending Message:   ['A', 'msg 1']
Sending Message:   ['B', 'msg 2']
Sending Message:   ['A', 'msg 3']
Sending Message:   ['B', 'msg 4']
Sending Message:   ['A', 'msg 5']
Sending Message:   ['B', 'msg 6']
Sending Message:   ['A', 'msg 7']
Sending Message:   ['B', 'msg 8']
Sending Message:   ['A', 'msg 9']
Closing the sender.
['B', 'msg 2']
['B', 'msg 4']
['B', 'msg 6']
['B', 'msg 8']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
Closing the receiver.
Terminating the context.
"""

И вопрос в том ... почему этот код не работает?

[РЕДАКТИРОВАТЬ] Получив супер быстрый ответ в списке рассылки zeromq, я обновил код выше.

1 Ответ

10 голосов
/ 06 июня 2011

Credit: Chuck Remes

Вам может потребоваться «спящий режим» между этапами создания сокета (bind, connect, setsockopt) и фактической передачей сообщений.Операции bind & connect являются асинхронными, поэтому они могут не завершиться к тому времени, когда вы доберетесь до логики, которая отправляет все сообщения.В этом случае любые сообщения, отправленные через PUB-сокет, будут отброшены , поскольку операция zmq_bind () не создает очередь до тех пор, пока к ней не будет успешно подключен другой сокет.

Как примечаниеВам не нужно создавать 2 контекста в этом примере.Оба сокета могут быть созданы в одном контексте.Это не больно, но и не нужно.

Кредит: Питер

В конце главы 1 есть "решатель проблем", который объясняет это.

Некоторые типы сокетов (ROUTER и PUB) автоматически отбрасывают сообщения, для которых у них нет получателей.Соединение, как сказал Чак, асинхронно и занимает около 100 мсек.Если вы запустите два потока, свяжете одну сторону, подключите другую, а затем сразу же начнете отправлять данные через сокет такого типа, вы потеряете первые 100 мсек данных (приблизительно).жестокий вариант «докажи, что это работает».На самом деле вы могли бы синхронизироваться каким-либо образом или (более типично) ожидать потери сообщения как часть обычного запуска (то есть видеть опубликованные данные как чистую трансляцию без определенного начала или конца)., syncpub и syncsub для деталей.

...