zmq PUB / SUB отбрасывает сообщения, когда `sleep ()` не выполняется после `send ()` - PullRequest
0 голосов
/ 13 ноября 2018

Что я пытаюсь достичь?

Надежный PUB / SUB с использованием zmq, куда подписчики могут приходить и уходить, когда они пожелают.

И, чтобы смоделировать эту модель, более или менее.

enter image description here

Я понимаю, что этот вопрос задавался раньше.

Мой отличается, потому что следующий код -

  • Выполняет явную синхронизацию с использованием ROUTER-DEALER, чтобы избежать синдрома медленного соединения.
  • Не сразу закрывает Context / Socket после send().

    import zmq
    import time
    import multiprocessing
    from pathlib import Path
    
    ADDR = f"ipc://{Path.home()}/.tmp/pubsubtest"
    ADDR2 = f"ipc://{Path.home()}/.tmp/pairtest"
    
    
    def server():
        with zmq.Context() as ctx, \
                ctx.socket(zmq.PUB) as pub, \
                ctx.socket(zmq.ROUTER) as router:
    
            pub.bind(ADDR)
            router.bind(ADDR2)
    
            i = 0
            while True:
                router.send_multipart(router.recv_multipart())
                pub.send_pyobj(i)
    
                # Client's don't recv anything without this!
                time.sleep(0.1)                  
    
                print("sent:", i)
    
                i += 1
    
    
    
    def client():
        with zmq.Context() as ctx, \
                ctx.socket(zmq.SUB) as sub, \
                ctx.socket(zmq.DEALER) as dealer:
    
            sub.setsockopt(zmq.SUBSCRIBE, b"")
            sub.connect(ADDR)                
            dealer.connect(ADDR2)                
    
            dealer.send(b"")
            dealer.recv()
    
            print("recv:", sub.recv_pyobj())
    
    
    def simulator():
        p = multiprocessing.Process(target=server)
        try:
            p.start()    
            while True:
                client()
        finally:
            p.terminate()
            p.join()
    
    simulator()
    
...