Гнездо ZeroMQ PUB буферизует все мои исходящие данные при подключении - PullRequest
13 голосов
/ 21 января 2012

Я заметил, что сокет ZEROMQ PUB буферизует все исходящие данные, если он подключается, например,

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print sub.recv()

Суб привязка после этих сообщений, они должны быть удалены, потому что PUB должен отбрасывать сообщения, если нетодин связан с этим.Но вместо того, чтобы отбрасывать сообщения, он буферизует все сообщения.

a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi

Как видите, эти "сообщения не должны быть отброшены" буферизуются сокетом, как только он подключается, он сбрасывает их в SUB.разъем.Если я подключусь к разъему PUB и подключусь к разъему SUB, он будет работать правильно.

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print repr(sub.recv())

И вы увидите только вывод

'hi'

Это странное поведениевызвать проблему, он буферизует все данные на соединительном сокете, у меня есть два сервера, сервер A публикует данные на сервер B

Server A -- publish --> Server B

Он работает нормально, если сервер B подключается к сети.Но что, если я запустил Сервер A и не запустил Сервер B?

В результате соединительный разъем PUB на Сервере A хранит все эти данные, использование памяти становится все выше и выше.

Вот в чем проблема, это поведение или ошибка?Если это особенность, где я могу найти документ, в котором упоминается такое поведение?И как я могу остановить подключение PUB сокета буферизует все данные?

Спасибо.

Ответы [ 6 ]

6 голосов
/ 22 января 2012

Зависит ли сокет от сообщений или отбрасывает сообщения, зависит от типа сокета, как описано в документации ZMQ :: Socket (выделение ниже мое):

ZMQ :: HWM: получение отметки максимальной воды

Опция ZMQ :: HWM извлекает верхнюю отметку для указанный сокет. Верхняя отметка является жестким пределом максимума количество ожидающих сообщений 0MQ должно стоять в памяти для любого один узел, с которым общается указанный сокет.

Если этот предел достигнут, розетка должна ввести исключительное состояние и в зависимости от типа сокета, 0MQ должен принимать соответствующие действие, например блокирование или удаление отправленных сообщений. См. описание отдельных сокетов в ZMQ :: Socket для подробностей о точном действие, предпринятое для каждого типа сокета.

Значение по умолчанию ZMQ :: HWM, равное нулю, означает «без ограничений».

Вы можете увидеть, будет ли он блокироваться или удаляться, просмотрев документацию для типа сокета для ZMQ::HWM option action, который будет либо Block, либо Drop.

Действие для ZMQ::PUB - Drop, поэтому, если оно не падает, вы должны проверить значение HWM (High Water Mark) и учесть предупреждение о том, что Значение по умолчанию ZMQ :: HWM, равное нулю, означает « без ограничений », , означающее, что оно не войдет в исключительное состояние, пока в системе не закончится память (в этот момент я не знаю, как она себя ведет).

4 голосов
/ 04 мая 2012

Я чувствую, что это поведение - семантика zmq_connect ().То есть: когда zmq_connect () возвращает успех, тогда концептуально устанавливается соединение, и, таким образом, ваш PUB-соединение начинает ставить сообщение в очередь вместо отбрасывания .

После выдержки из " ZMQРуководство"является подсказкой для этого:

В теории с гнездами ØMQ не имеет значения, какой конец соединяется, а какой конец соединяется.Однако в случае разъемов PUB-SUB, если вы связываете разъем SUB и подключаете разъем PUB, сокет SUB может получать старые сообщения, то есть сообщения, отправленные до запуска SUB. Это артефакт работы bind / connect. Лучше всего связать PUB и подключить SUB, если можете.

Следующий раздел в zmq_connect *У 1016 * () есть несколько советов, показанных ниже:

Основные отличия от обычных сокетов

Вообще говоря, обычные сокеты представляют собой синхронный интерфейс с любым из ориентированных на соединение надежных потоков байтов (SOCK_STREAM) или ненадежные датаграммы без установления соединения (SOCK_DGRAM).Для сравнения, сокеты ØMQ представляют собой абстракцию асинхронной очереди сообщений с точной семантикой очереди, зависящей от используемого типа сокета.Там, где обычные сокеты передают потоки байтов или дискретные дейтаграммы, сокеты ØMQ передают дискретные сообщения.

Сокеты ØMQ, являющиеся асинхронными, означают, что временные параметры настройки физического соединения, разрыва, повторного соединения и эффективной доставки прозрачны дляпользователь и организован самим ØMQ.Кроме того, сообщения могут быть поставлены в очередь в случае, если одноранговый узел недоступен для их получения.

1 голос
/ 22 января 2012

Они устанавливают опцию HWM на сокете.

0 голосов
/ 22 сентября 2013

Вот хак, который может помочь ...

Установите для ZMQ::HWM фиксированное число, скажем, 10. При подключении вызывайте метод recv сокета абонента в цикле, пока он не сбросит всебуферизованные сообщения, и только тогда запускается ваш основной цикл приема.

0 голосов
/ 22 января 2012

Вы должны быть в состоянии установить верхнюю отметку в гнезде, используя параметр hwm для гнезда паба. Позволяет определить, сколько сообщений хранится.

0 голосов
/ 21 января 2012

Таким образом, bind () и connect () приводят к двум различным поведениям.Почему бы вам просто не выбрать, какой из них вы предпочитаете (это похоже на bind ()), и использовать его?

В действительности функция ZeroMQ в целом заключается в том, что она буферизует исходящие сообщения до тех пор, пока не будет установлено соединение.

...