У меня есть процесс A, который постоянно публикует сообщение, а процессы B и C подписываются на тему и получают последнее сообщение, опубликованное издателем в процессе A.
Итак, я установил zmq.CONFLATE
для обоихиздатель и подписчик.Однако я обнаружил, что один подписчик не мог получать сообщения.
def publisher(sleep_time=1.0, port="5556"):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.bind("tcp://*:%s" % port)
print ("Running publisher on port: ", port)
while True:
localtime = time.asctime( time.localtime(time.time()))
string = "Message published time: {}".format(localtime)
socket.send_string("{}".format(string))
time.sleep(sleep_time)
def subscriber(name="sub", sleep_time=1, ports="5556"):
print ("Subscriber Name: {}, Sleep Time: {}, Port: {}".format(name, sleep_time, ports))
context = zmq.Context()
print ("Connecting to publisher with ports %s" % ports)
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
socket.connect ("tcp://localhost:%s" % ports)
while True:
message = socket.recv()
localtime = time.asctime( time.localtime(time.time()))
print ("\nSubscriber [{}]\n[RECV]: {} at [TIME]: {}".format(name, message, localtime))
time.sleep(sleep_time)
if __name__ == "__main__":
Process(target=publisher).start()
Process(target=subscriber, args=("SUB1", 1.2, )).start()
Process(target=subscriber, args=("SUB2", 1.1, )).start()
Я попытался сбросить socket.setsockopt(zmq.CONFLATE, 1)
в издателе, и это, похоже, решило проблему.Оба подписчика в процессах B и C могли получать сообщения, и эти сообщения казались самыми последними.
Я пытаюсь выяснить, почему установка издателя с CONFLATE
вызвала проблему, с которой я столкнулся.Я не мог найти информацию об этом.Кто-нибудь знает, что вызывает такое поведение?
Кроме того, я хочу знать, в случае одного издателя для нескольких подписчиков, какова правильная настройка кода, чтобы подписчик всегда мог получать последние сообщения?