Синхронизированный ZeroMQ PUB-SUB: Абонент зависает с синхронизацией, но в противном случае работает нормально - PullRequest
0 голосов
/ 02 ноября 2018

Я использую ZeroMQ для установления модели связи между издателем и подписчиком.
Издатель создает контекст zmq, а затем открывает сокет с PUB
шаблон общения. Затем он привязывается к порту, так как используется транспортный протокол TCP. Для синхронизации открывается отдельный сокет с шаблоном связи REP, который связывается по другому пути. Если не получен запрос синхронизации в msg = syncservice.recv(), программа не может продолжаться. Затем он выполняет некоторую элементарную работу и начинает снова. Вот код для издателя:

import pickle, zmq, random, string

# Wait for 1 subscriber
SUBSCRIBERS_EXPECTED = 1

def randomword(length):
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(length))

while True:
    try:
        arguments = {}
        data = {}
        context = zmq.Context()

        # Socket to talk to clients
        publisher = context.socket(zmq.PUB)
        # set SNDHWM, in case of slow subscribers
        publisher.sndhwm = 1100000
        publisher.bind('tcp://*:5561')

        # Socket to receive signals
        syncservice = context.socket(zmq.REP)
        syncservice.bind('tcp://*:5562')

        # Get synchronization from subscribers
        subscribers = 0
        while subscribers < SUBSCRIBERS_EXPECTED:
            # wait for synchronization request
            msg = syncservice.recv()
            # send synchronization reply
            syncservice.send(b'')
            subscribers += 1

        for n in range(1000):
            for i in range(random.randrange(1, 6)):
                arguments[i] = randomword(random.randrange(2, 10))

            data['func_name_' + str(n)] = randomword(8)
            data['arguments_' + str(n)] = arguments

        data_string = pickle.dumps(data)
        publisher.send(data_string)

    except KeyboardInterrupt:
        print("Interrupt received, stopping...")
        break

Подписчик работает почти так же, как и издатель, хотя с точки зрения подписчика. Вот код для подписчика:

import pickle, zmq, pprint, time

context = zmq.Context()

# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5561')
subscriber.setsockopt(zmq.SUBSCRIBE, b'')

time.sleep(1)

# Synchronize with publisher
syncclient = context.socket(zmq.REQ)
syncclient.connect('tcp://localhost:5562')

# Initialize poll set
poller = zmq.Poller()
poller.register(syncclient, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

# send a synchronization request
syncclient.send(b'')

while True:
    try:
        socks = dict(poller.poll())

    except KeyboardInterrupt:
        print("Interrupt received, stopping...")
        break

    # wait for synchronization reply
    if syncclient in socks:
        syncclient.recv()
        print('Sync')

    if subscriber in socks:
        msg = subscriber.recv()
        data = pickle.loads(msg)
        pprint.pprint(data)
        syncclient.send(b'')

Желаемый результат будет для издателя бесконечно публиковать, а Абонент постоянно получает и печатает все. Если я удалю часть синхронизации, все работает как положено. Если я продолжу синхронизацию часть абонента зависает после ряда передач. Интересная вещь является то, что если я отправляю прерывание клавиатуры (Ctrl-C), а затем перезапустить абонента, он снова получит пару передач и снова зависнет, и так далее, и так далее п.

Я пробовал разные настройки верхнего водяного знака, но это не имело никакого значения. Я попытался закрыть сокеты и завершить контекст после каждого цикла. Я проверял, были ли чрезмерные накладные расходы от печати или травления (сериализации), но это тоже не было. Я также модифицировал пример самоубийства улитки для работы в этом случае, но подписчик не умер. Что мне не хватает? (Python 3 используется для каждого примера)

...