Ограничение длины очереди с PyZMQ - PullRequest
5 голосов
/ 22 февраля 2012

Я хочу ограничить объем памяти, используемой моими очередями сообщений ZeroMQ в приложении Python.Я знаю, что установка верхней отметки ограничит количество, которое будет помещено в очередь на стороне отправителя, но есть ли способ контролировать, сколько будет стоять в очереди на стороне получателя?Кажется, что для привязки Python ZeroMQ установлено неограниченное значение.

Мой тестовый сценарий: у меня есть два терминала Python, которые я использую для тестирования.Один - получатель:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context = zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.bind("tcp://127.0.0.1:12345")

Другой - отправитель:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PUSH)
>>> socket.setsockopt(zmq.SNDBUF, 2048)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.connect("tcp://127.0.0.1:12345")
>>> num = 0
>>> while True:
...  print num
...  socket.send(str(num))
...  num = num + 1
... 

Я запускаю socket.recv() на стороне получателя пару раз, чтобы убедиться, что очередь работает, нокроме этого, пусть два терминала просто сидят там.Похоже, что цикл отправки никогда не блокируется, а приглашение получения, похоже, занимает все больше места в памяти.

Ответы [ 3 ]

3 голосов
/ 22 февраля 2012

В отличие от документации ZeroMQ, отметка максимальной воды должна быть установлена ​​как на стороне PUSH, так и на стороне PULL. Как только я изменил PULL, он работал лучше. Новый код PULL:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.bind("tcp://127.0.0.1:12345")
0 голосов
/ 16 сентября 2018

С помощью опций zmq.SNDBUF и zmq.RCVBUF вы можете установить ограничение на размер буфера .


Кроме того, я использую опцию zmq.CONFLATE на стороне получателя для ограниченияразмер очереди ZeroMQ в один:

Вот пример с ZMQ PUSH/PULL:

Отправитель (zmq.PUSH):

def create_pub_socket(ip, port):
    try:
        context = zmq.Context()
        socket = context.socket(zmq.PUSH)
        socket.setsockopt(zmq.SNDHWM, 1)
        zmq_address = "tcp://{}:{}".format(ip, port)
        socket.connect(zmq_address)
        return socket

    except zmq.ZMQError as exp:
        print(exp)
        return False

sock = create_push_socket('127.0.0.1', 5558)
if sock:
    sock.send_json({'a': 1})

Getter (zmq.PULL):

def listen(self):
    sock = None
    try:
        context = zmq.Context()
        sock = context.socket(zmq.PULL)
        sock.setsockopt(zmq.RCVHWM, 1)
        sock.setsockopt(zmq.CONFLATE, 1)  # last msg only.
        sock.bind("tcp://*:5558")

    except zmq.ZMQError:
        logger.captureException()

    configs = None
    while configs is None:
        if sock:
            configs = sock.recv_json()
            time.sleep(1e-1)
        else:
            time.sleep(5)
            listen()  # Recursive.
listen()
0 голосов
/ 25 марта 2015

На самом деле документация гласит:

"Когда сокет ZMQ_PUSH входит в исключительное состояние из-за наличия достиг максимальной отметки для всех узлов ниже по течению, или если есть нет нисходящих узлов вообще, тогда любые операции zmq_send (3) на Сокет должен блокироваться до тех пор, пока не закончится исключительное состояние или хотя бы один нисходящий узел становится доступным для отправки; сообщения не отбрасываются. "

http://api.zeromq.org/2-1:zmq-socket

Что прямо говорит о том, что вы можете (и должны) устанавливать отметку максимальной воды для узлов ниже по течению (иначе тяга), и, возможно, подразумевает, что установка ее на стороне толчка не будет иметь никакого эффекта (хотя я подозреваю, что это не так, потому что до сих пор доступны нисходящие узлы, но сообщения поступают быстрее, чем они могут быть отправлены.)

...