ZMQ- python: PUSH / PULL с одним ко многим - PullRequest
1 голос
/ 25 марта 2020

Я пробую zmq со следующими кодами, но подписчики получают объекты один за другим.

Ниже приведен мой сценарий PU SH:

# zmq server -- run it once

import zmq
import time
# server
# print(zmq.Context)
ctx = zmq.Context()
sock = ctx.socket(zmq.PUSH)
sock.bind('ipc:///tmp/zmqtest')
i=0
while True:
    i+=1
    time.sleep(0.5)
    sock.send_pyobj((i))

и следующие такое сценарий PULL:

# zmq client -- run it 2,3 times in parallel

import zmq
ctx = zmq.Context() # create a new context to kick the wheels
sock = ctx.socket(zmq.PULL)
sock.connect('ipc:///tmp/zmqtest')

i=0
while True:
    i+=1
    o = sock.recv_pyobj()
    print('received python object:', o,i)
    if o == 'quit':
        print('exiting.')
        break

Я получаю следующие выходные данные из одного из сценариев PULL:

received python object: 1 1
received python object: 3 2
received python object: 5 3
received python object: 7 4

Как я могу вывести sh объекты в оба сценария параллельно? Я пробовал PUB / SUB, но он не работает таким образом. (можете проверить замену PUSH/PULL на PUB/SUB)

1 Ответ

0 голосов
/ 26 марта 2020

PUB- сторона:

# zmq PUB-server -- run it once

import zmq
import time

IPC  = 'ipc:///tmp/zmqtest'
ctx  = zmq.Context()
PUB  = ctx.socket( zmq.PUB )
PUB.bind( IPC )
#------------------------------------------------- SELF-DEFENSIVE CONFIGURATION
PUB.setsockopt( zmq.LINGER, 0 )
PUB.setsockopt( zmq...        )
#------------------------------------------------------------------------------
i = 0
while True:
    i += 1
    time.sleep( 0.5 )
    sock.send_pyobj( ( i ) )
#------------------------------------------------------------------------------

SUB сторона (и):

# zmq SUB-client -- run x-times concurrently ( or distributed, if other TransportClasses permit )

import zmq

IPC = 'ipc:///tmp/zmqtest'        # <TransportClass>://<address>, TCP,TIPC,...may follow
ctx = zmq.Context()               # create a new context to kick the wheels
SUB = ctx.socket( zmq.SUB )
SUB.connect( IPC )
#------------------------------------------------- SELF-DEFENSIVE CONFIGURATION
SUB.setsockopt( zmq.LINGER,     0 )
SUB.setsockopt( zmq.SUBSCRIBE, "" )
SUB.setsockopt( zmq...            )
#------------------------------------------------------------------------------
i    = 0
aClk = zmq.Stopwatch()
MASK = '(i:{1:_>9d}): After{2:_>+12d} [us] did .recv() a python object:[{0:}]'
while True:
    i += 1
    aClk.start()
    o = sock.recv_pyobj()
    _ = aClk.stop()
    print( MASK.format( repr( o ), i, _ ) )

    if o == 'quit':
        print( 'Will exit.' )
        #--------------------------------------- BE NICE & FAIR TO RESOURCES
        SUB.setsockopt( zmq.UNSUBSCRIBE, "" )
        SUB.disconnect( IPC )
        SUB.close()
        ctx.term()
        #-------------------------------------------------------------------
        break

"да. Мне нужно иметь каждый объект отправлено обоим (или многим) сценариям без потерь"

Имейте в виду, на это действует нулевая гарантия. Для этого можно создать собственный протокол уровня приложения.

...