паттерны постоянства zeromq - PullRequest
11 голосов
/ 30 октября 2010

Кто должен управлять персистентом в ZeroMQ?

Когда мы используем клиенты ZeroMQ на языке Python, какие плагины / модули доступны для управления персистентностью?

Я хотел бы знать шаблоны для использования ZeroMQ.

Ответы [ 3 ]

9 голосов
/ 07 декабря 2010

Насколько я знаю, у Zeromq нет постоянства.Это выходит за рамки его применения и должно обрабатываться конечным пользователем.Так же, как сериализация сообщения.В C # я использовал db4o, чтобы добавить постоянство.Обычно я сохраняю объект в исходном состоянии, затем сериализую его и отправляю в сокет ZMQ.Кстати, это было для пары PUB / SUB.

3 голосов
/ 07 марта 2012

В конце приложения вы можете сохраниться соответствующим образом, например, я построил слой постоянства в node.js, который связывался с внутренними вызовами php и через веб-сокеты.

Аспект устойчивости удерживал сообщения в течение определенного периода времени (http://en.wikipedia.org/wiki/Time_to_live) это было для того, чтобы дать клиентам возможность подключиться. Я использовал структуры данных в памяти, но мне нравилась идея использовать redis для получения настойкость диска.

1 голос
/ 05 мая 2018

Нам нужно было сохранить полученные сообщения от подписчика перед их обработкой.Сообщения принимаются в отдельном потоке и хранятся на диске, а в основном потоке манипулируется с сохраненной очередью сообщений.

Модуль доступен по адресу: https://pypi.org/project/persizmq. Из документации:

import pathlib

import zmq

import persizmq

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")

persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)

def on_exception(exception: Exception)->None:
    print("an exception in the listening thread: {}".format(exception))

with persizmq.ThreadedSubscriber(
    callback=storage.add_message, subscriber=subscriber, 
    on_exception=on_exception):

    msg = storage.front()  # non-blocking
    if msg is not None:
        print("Received a persistent message: {}".format(msg))
        storage.pop_front()
...