Как правильно опубликовать и подписаться на последнее сообщение, используя pyzmq? - PullRequest
3 голосов
/ 17 мая 2019

У меня есть процесс A, который постоянно публикует сообщение, а процессы B и C подписываются на тему и получают последнее сообщение, опубликованное издателем в процессе A.

Итак, я установил zmq.CONFLATE для обоихиздатель и подписчик.Однако я обнаружил, что один подписчик не мог получать сообщения.

def publisher(sleep_time=1.0, port="5556"):

    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.bind("tcp://*:%s" % port)
    print ("Running publisher on port: ", port)

    while True:
        localtime = time.asctime( time.localtime(time.time()))
        string = "Message published time: {}".format(localtime)
        socket.send_string("{}".format(string))
        time.sleep(sleep_time)

def subscriber(name="sub", sleep_time=1, ports="5556"):

    print ("Subscriber Name: {}, Sleep Time: {}, Port: {}".format(name, sleep_time, ports))

    context = zmq.Context()
    print ("Connecting to publisher with ports %s" % ports)
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")
    socket.connect ("tcp://localhost:%s" % ports)

    while True:

        message = socket.recv()
        localtime = time.asctime( time.localtime(time.time()))
        print ("\nSubscriber [{}]\n[RECV]: {} at [TIME]: {}".format(name, message, localtime))
        time.sleep(sleep_time)


if __name__ == "__main__":
    Process(target=publisher).start()
    Process(target=subscriber, args=("SUB1", 1.2, )).start()
    Process(target=subscriber, args=("SUB2", 1.1, )).start()

Я попытался сбросить socket.setsockopt(zmq.CONFLATE, 1) в издателе, и это, похоже, решило проблему.Оба подписчика в процессах B и C могли получать сообщения, и эти сообщения казались самыми последними.

Я пытаюсь выяснить, почему установка издателя с CONFLATE вызвала проблему, с которой я столкнулся.Я не мог найти информацию об этом.Кто-нибудь знает, что вызывает такое поведение?

Кроме того, я хочу знать, в случае одного издателя для нескольких подписчиков, какова правильная настройка кода, чтобы подписчик всегда мог получать последние сообщения?

Ответы [ 2 ]

1 голос
/ 20 мая 2019

Скорее всего, это проблема синхронизации, опция сокета ZMQ_CONFLATE ограничивает входящую и исходящую очередь 1 сообщением.

Принцип работы PUB / SUB заключается в том, что подписчик отправляет подписчику сообщение о подписке, когда вы устанавливаете опцию ZMQ_SUBSCRIBE.Если вы одновременно запускаете обоих подписчиков, то возможно, что одно из сообщений подписки, поступивших в очередь издателя, будет отброшено.

Попробуйте добавить спящий режим между запуском каждого подписчика.

Из документов zeromq

Если установлено, сокет должен хранить только одно сообщение в своей входящей / исходящей очереди, причем это сообщение является последним полученным / последним отправленным сообщением.Игнорирует параметры ZMQ_RCVHWM и ZMQ_SNDHWM.Не поддерживает многокомпонентные сообщения, в частности, только одна его часть хранится во внутренней очереди сокетов.

Я не говорю, что это решение вашей проблемы, но если этов этом случае нам может потребоваться опубликовать изменение в libzmq, чтобы сделать параметры сопоставления более детальными, чтобы вы могли выбрать, следует ли применять сопоставление к входящим или исходящим очередям.

0 голосов
/ 21 мая 2019

Есть способ получить опцию " Только последнее сообщение " в ZMQ Сокете подписки (с использованием опции CONFLATE).

Вам это нужно на стороне абонента.

Вот пример:

import zmq

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1)  # last msg only.
socket.connect("tcp://localhost:%s" % port)  # must be placed after above options.

while True:
    data = socket.recv()
    print data

С другой стороны, Я удалил все буферизованные очереди в коде подписчика.


[ В дополнительном ]:

С опциями zmq.SNDBUF и zmq.RCVBUF мы можем установить ограничение на размер буфера ZMQ. ( Более полный и пример )


...