Отказоустойчивая рассылка сообщений для использования конкретным c получателем с использованием redis и python - PullRequest
2 голосов
/ 16 февраля 2020

Итак, Redis 5.0 недавно представил новую функцию под названием Streams . Похоже, они идеально подходят для распространения сообщений для межпроцессного взаимодействия:

  • они превосходят возможности обмена сообщениями о событиях PUB / SUB с точки зрения надежности: PUB / SUB - это запуск и забывание, гарантии нет. получатель получит сообщение
  • списки редисов несколько низкоуровневые, но все еще могут быть использованы. Тем не менее, потоки оптимизированы для производительности и в точности описанного выше варианта использования.

Однако, поскольку эта функция является довольно новой, существует только какое-либо руководство по Python (или даже общее повторное редактирование) и Я не совсем понимаю, как адаптировать потоковую систему к своему варианту использования.

Я хочу иметь одну программу издателя, которая отправляет сообщения в поток и содержит информацию о получателе (например, recipient: "user1"). Затем у меня будет несколько процессов получения, которые все должны проверить на наличие новых потоковых сообщений и сравнить, являются ли они целевым получателем. Если это так, они должны обработать сообщение и пометить его как обработанное (подтвержденное).

Тем не менее, мне не совсем понятно представление о группах потребителей, состоянии ожидания и так далее. Кто-нибудь может дать мне реальный пример моего маленького псевдокода?

sender.py

db = Redis(...)
db.the_stream.add({"recipient": "user1", "task": "be a python"})

receient.py (там будет много экземпляров, каждый из которых будет работать с уникальным идентификатором получателя)

recipient_id = "user1" # you get the idea...
db = Redis(...)
while True:
    message = db.the_stream.blocking_read("$") # "$" somehow means: just receive new messages
    if message.recipient == recipient_id:
        perform_task(message.task)
        message.acknowledge() # let the stream know it was processed
    else:
        pass # well, do nothing here since it's not our message. Another recipient instance should do the job.```

1 Ответ

1 голос
/ 21 февраля 2020

Используя приведенный пример и псевдокод, давайте представим, что:

  • recipient.user1 получает 60 сообщений в минуту
  • , а метод perform_task() занимает 2 секунды, чтобы выполнить.

То, что здесь произойдет, очевидно: задержка между поступлением нового сообщения и его обработкой будет только расти со временем, все больше отдаляясь от «обработки в реальном времени».

system throughput = 30 messages/minute

Чтобы обойти это, вы можете создать группу потребителей для user1. Здесь вы могли бы иметь 4 отдельных python процесса, работающих параллельно, и все 4 были объединены в одну группу для user1. Теперь, когда приходит сообщение для user1, один из 4 рабочих его подберет и perform_task().

system throughput = 120 message/minute

В вашем примере message.acknowledge() не на самом деле существует, потому что ваш потоковый читатель один (команды XREAD).

Если бы это была группа, подтверждение сообщений становится необходимым, вот как Redis знает, что один из членов группы действительно обработал это сообщение, поэтому он может «двигаться дальше» (он может забыть тот факт, что это сообщение ожидало подтверждения). Когда вы используете группы, существует небольшая часть серверной логики c для обеспечения того, чтобы каждое сообщение доставлялось одной из рабочих групп потребителей один раз (команды XGROUPREAD). Когда клиент завершил работу, он выдает подтверждение этого сообщения (команды XACK), чтобы «буфер группы потребителей» на стороне сервера мог удалить его и двигаться дальше.

Представьте, если работник умер и никогда не подтвердил сообщение , С группой потребителей вы можете следить за этой ситуацией (используя команды XPENDING) и реагировать на них, например, повторяя попытку обработать то же сообщение у другого потребителя.

Когда вы не используете группы серверу redis не нужно «двигаться дальше», «подтверждение» становится на 100% клиентской / бизнес логи c.

...