Верхний знак Pyzmq не работает на пабе - PullRequest
0 голосов
/ 18 ноября 2018

Согласно документации ZeroMQ сокет паба должен отбрасывать сообщения, как только количество сообщений в очереди достигает максимальной отметки.

Это не работает в следующем примере (и да, я устанавливаю hwm перед привязкой / подключением):

import time
import pickle
from threading import Thread
import zmq

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.set_hwm(2)
    pub.bind('tcp://*:5555')

    i = 0
    while True:
        # Send message every 100ms
        time.sleep(0.1)
        pub.send_string("test", zmq.SNDMORE)
        pub.send_pyobj(i)
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.subscribe("test")
    sub.connect('tcp://localhost:5555')
    while True:
        # Receive messages only every second
        time.sleep(1)
        msg = sub.recv_multipart()
        print("Sub: %d" % pickle.loads(msg[1]))

t_pub = Thread(target=pub_thread)
t_sub = Thread(target=sub_thread)
t_pub.start()
t_sub.start()

while True:
    pass

Я отправляю сообщения в паб в 10 раз быстрее, чем читаю их во вспомогательном сокете, hwm установлен на 2. Я ожидал бы получать только каждое 10-е сообщение. Вместо этого я вижу следующий вывод:

Sub: 0
Sub: 1
Sub: 2
Sub: 3
Sub: 4
Sub: 5
Sub: 6
Sub: 7
Sub: 8
Sub: 9
Sub: 10
Sub: 11
Sub: 12
Sub: 13
Sub: 14
...

поэтому я вижу все поступающие сообщения, поэтому они удерживаются в некоторой очереди, пока я их не прочитаю. То же самое справедливо и при добавлении hwm = 2 в суб-сокет до подключения.

Что я делаю не так или я неправильно понимаю hwm?

Я использую pyzmq версия 17.1.2

1 Ответ

0 голосов
/ 18 ноября 2018

Заимствовав ответ на проблему , которую я открыл в Github , я обновил свой ответ следующим образом:


Сообщения хранятся в сети операционной системы.буферы.Из-за этого я обнаружил, что HWM не очень полезны.Вот модифицированный код, в котором подписчик пропускает сообщения:

import time
import pickle
import zmq
from threading import Thread
import os

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.setsockopt(zmq.SNDHWM, 2)
    pub.setsockopt(zmq.SNDBUF, 2*1024)  # See: http://api.zeromq.org/4-2:zmq-setsockopt
    pub.bind('tcp://*:5555')
    i = 0
    while True:
        time.sleep(0.001)
        pub.send_string(str(i), zmq.SNDMORE)
        pub.send(os.urandom(1024))
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    sub.setsockopt(zmq.RCVHWM, 2)
    sub.setsockopt(zmq.RCVBUF, 2*1024)
    sub.connect('tcp://localhost:5555')
    while True:
        time.sleep(0.1)
        msg, _ = sub.recv_multipart()
        print("Received:", msg.decode())

t_pub = Thread(target=pub_thread)
t_pub.start()
sub_thread()

Вывод выглядит примерно так:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 47
Received: 48
Received: 64
Received: 65
Received: 84
Received: 85
Received: 159
Received: 160
Received: 270

Сообщения пропущены, поскольку все очереди / буферы заполнены ииздатель начинает сбрасывать сообщения (см. документацию для ZMQ_PUB: http://api.zeromq.org/4-2:zmq-socket).


[ NOTE ]:

  • Вы должны использовать высокийопция -water mark в слушателе / ​​подписчике и рекламодателе / ​​издателе.
  • Эти сообщения также актуальны ( Post1 - Post2 )
  • sock.setsockopt(zmq.CONFLATE, 1)это еще один вариант, чтобы получить только последнее сообщение, определенное на стороне подписчика.
...