Что я пытаюсь достичь?
Надежный PUB / SUB с использованием zmq, куда подписчики могут приходить и уходить, когда они пожелают.
И, чтобы смоделировать эту модель, более или менее.
Я понимаю, что этот вопрос задавался раньше.
Мой отличается, потому что следующий код -
- Выполняет явную синхронизацию с использованием
ROUTER-DEALER
, чтобы избежать синдрома медленного соединения.
Не сразу закрывает Context
/ Socket
после send()
.
import zmq
import time
import multiprocessing
from pathlib import Path
ADDR = f"ipc://{Path.home()}/.tmp/pubsubtest"
ADDR2 = f"ipc://{Path.home()}/.tmp/pairtest"
def server():
with zmq.Context() as ctx, \
ctx.socket(zmq.PUB) as pub, \
ctx.socket(zmq.ROUTER) as router:
pub.bind(ADDR)
router.bind(ADDR2)
i = 0
while True:
router.send_multipart(router.recv_multipart())
pub.send_pyobj(i)
# Client's don't recv anything without this!
time.sleep(0.1)
print("sent:", i)
i += 1
def client():
with zmq.Context() as ctx, \
ctx.socket(zmq.SUB) as sub, \
ctx.socket(zmq.DEALER) as dealer:
sub.setsockopt(zmq.SUBSCRIBE, b"")
sub.connect(ADDR)
dealer.connect(ADDR2)
dealer.send(b"")
dealer.recv()
print("recv:", sub.recv_pyobj())
def simulator():
p = multiprocessing.Process(target=server)
try:
p.start()
while True:
client()
finally:
p.terminate()
p.join()
simulator()