Я использую ZeroMQ для установления модели связи между издателем и подписчиком.
Издатель создает контекст zmq, а затем открывает сокет с PUB
шаблон общения. Затем он привязывается к порту, так как используется транспортный протокол TCP. Для синхронизации открывается отдельный сокет с шаблоном связи REP, который связывается по другому пути. Если не получен запрос синхронизации в msg = syncservice.recv()
, программа не может продолжаться. Затем он выполняет некоторую элементарную работу и начинает снова. Вот код для издателя:
import pickle, zmq, random, string
# Wait for 1 subscriber
SUBSCRIBERS_EXPECTED = 1
def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
while True:
try:
arguments = {}
data = {}
context = zmq.Context()
# Socket to talk to clients
publisher = context.socket(zmq.PUB)
# set SNDHWM, in case of slow subscribers
publisher.sndhwm = 1100000
publisher.bind('tcp://*:5561')
# Socket to receive signals
syncservice = context.socket(zmq.REP)
syncservice.bind('tcp://*:5562')
# Get synchronization from subscribers
subscribers = 0
while subscribers < SUBSCRIBERS_EXPECTED:
# wait for synchronization request
msg = syncservice.recv()
# send synchronization reply
syncservice.send(b'')
subscribers += 1
for n in range(1000):
for i in range(random.randrange(1, 6)):
arguments[i] = randomword(random.randrange(2, 10))
data['func_name_' + str(n)] = randomword(8)
data['arguments_' + str(n)] = arguments
data_string = pickle.dumps(data)
publisher.send(data_string)
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
Подписчик работает почти так же, как и издатель, хотя
с точки зрения подписчика. Вот код для подписчика:
import pickle, zmq, pprint, time
context = zmq.Context()
# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5561')
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(1)
# Synchronize with publisher
syncclient = context.socket(zmq.REQ)
syncclient.connect('tcp://localhost:5562')
# Initialize poll set
poller = zmq.Poller()
poller.register(syncclient, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
# send a synchronization request
syncclient.send(b'')
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
print("Interrupt received, stopping...")
break
# wait for synchronization reply
if syncclient in socks:
syncclient.recv()
print('Sync')
if subscriber in socks:
msg = subscriber.recv()
data = pickle.loads(msg)
pprint.pprint(data)
syncclient.send(b'')
Желаемый результат будет для издателя бесконечно публиковать, а
Абонент постоянно получает и печатает все. Если я удалю
часть синхронизации, все работает как положено. Если я продолжу синхронизацию
часть абонента зависает после ряда передач. Интересная вещь
является то, что если я отправляю прерывание клавиатуры (Ctrl-C), а затем перезапустить абонента,
он снова получит пару передач и снова зависнет, и так далее, и так далее
п.
Я пробовал разные настройки верхнего водяного знака, но это не имело никакого значения. Я попытался закрыть сокеты и завершить контекст после каждого цикла. Я проверял, были ли чрезмерные накладные расходы от печати или травления (сериализации), но это тоже не было. Я также модифицировал пример самоубийства улитки для работы в этом случае, но подписчик не умер. Что мне не хватает? (Python 3 используется для каждого примера)