Azure Поведение очереди доставляет одинаковые сообщения нескольким работникам одновременно - PullRequest
0 голосов
/ 12 марта 2020

Я имею дело с какой-то странной проблемой, используя Azure Очереди. У нас есть очередь с несколькими работниками. Поведение является простым:

  • Очередь опроса для сообщений
  • Если есть сообщение: обработать сообщение (невидимость по умолчанию = 30 с)
  • Через 15 секунд, если мы все еще обрабатываю сообщение. Сбросьте время ожидания видимости до 30 секунд
  • Повторяйте предыдущий шаг до завершения обработки сообщения
  • Удаление сообщения из очереди

Повторный запрос выполняется следующим образом:

    def _reaquire_if_necessary(self, active_message: QueueMessage):
        if self._elapsed() > VISIBILITY_TIMEOUT_RESET_THRESHOLD:
            logger.info(f"Reacquiring message with id: {active_message.id}")
            try:
                active_message = self._queue.update_message(
                    active_message,
                    visibility_timeout=VISIBILITY_TIMEOUT_QUEUE
                )
            except HttpResponseError as http_error:
                raise ReaquireException(f'Could not reacquire message: {active_message.id}') from http_error

            self._start = time.time()
        return active_message

Основной l oop, опрашивающий очередь и извлекающий сообщения, работает следующим образом:

            messages = list(self._queue.receive_messages(
                messages_per_page=1,
                visibility_timeout=VISIBILITY_TIMEOUT_QUEUE
            ))
            if not messages:
                if active:
                    active = False
                    logger.info('No messages in queue. Entering INACTIVE state and polling mode.')

                time.sleep(POLL_FREQUENCY_QUEUE)
                continue
            else:
                if not active:
                    logger.info("New message in queue. Resuming ACTIVE state.")
                    active=True

            self._start = time.time()
            for message in messages:
                try:
                    self._process_message(message)
                except ReaquireException as reaquire_error:
                    logger.warning(f'Message {message.id} NotFound on reacquire. Message will be ignored.')
                except Exception as e:
                    logger.exception(f"Failed processing message: {message.content} - "
                                     f"retry {failed_messages[message.id]}/{RETRY_COUNT}")
                    # ...moving to dead-letter queue...

Когда мы обновляем видимость очереди, метод update_message() возвращает объект сообщения. Мы переназначаем active_message этому сообщению. Таким образом, любые удаления, обновления и т. Д. c отправляются из самого современного объекта сообщения.


Теперь вот наша проблема:

Когда очередь заполняется сообщения, через некоторое время, в какой-то момент, один из потребителей получит то же сообщение , что и другой работник . Иногда даже ... 10-15 секунд между ними, получающими одно и то же сообщение.

Первый рабочий преуспеет на _reacquire_if_necessary и обновит невидимость. Второй работник потерпит неудачу.

Так хорошо ... мы можем справиться с этим. Просто получите следующее сообщение, без вреда, без фола. Но здесь это становится действительно раздражающим. Теперь второй работник пропустит все сообщения во всей очереди. Получите все сообщения один раз, но с ошибкой NotFound на update_message для каждого полученного сообщения. После зацикливания всех сообщений он может подождать, пока сообщения станут видимыми, а затем просто снова начать обычную обработку.

Я не знаю, что вызывает такое поведение, и не могу объяснить его. Похоже, что azure очереди не делают сообщения невидимыми правильно, когда они извлекаются работником. И в то же время кажется, что работники могут получать сообщения, которые должны быть невидимыми.

Есть ли в нашей схеме доступа что-то не так? Или кто-нибудь может объяснить это поведение и указать, как его решить?

Ожидаемое поведение :

Ожидается следующее:

  • Если сообщение получено работником, оно не отправляется одновременно другому работнику, а сразу становится невидимым
  • Если работник получает сообщение. Любые последующие вызовы update_message в окне невидимости должны быть успешными. Потому что у нас есть всплывающая квитанция для этого сообщения.
  • Невидимые сообщения ... невидимы ... и не могут быть получены ... пока, вы знаете, снова не станут видимыми. Пожалуйста.

Заранее спасибо.


Пример вывода:

2020-03-12 19:19:30,548 - denormalize.py - INFO - Received message [identifier: idx-source/index_denorm.csv/part-00021]
2020-03-12 19:20:01,953 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:20:32,763 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:21:03,248 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:21:33,358 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:22:03,758 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:22:34,058 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:23:04,260 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:23:34,753 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:24:05,057 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:24:35,462 - denormalize.py - INFO - Reacquiring message with id: 77760271-16fc-4f79-82cc-9e38709960f9
2020-03-12 19:25:01,476 - denormalize.py - INFO - Completed message [identifier: idx-source/index_denorm.csv/part-00021]: 11102 tokens - elapsed 325.10 seconds
2020-03-12 19:25:01,649 - denormalize.py - INFO - Received message [identifier: idx-source/index_denorm.csv/part-00022]
2020-03-12 19:25:07,755 - denormalize.py - INFO - Reacquiring message with id: 5e274e21-98c2-484c-9502-d69117b86be0
2020-03-12 19:25:07,856 - denormalize.py - WARNING - Message 5e274e21-98c2-484c-9502-d69117b86be0 NotFound on reacquire. Message will be ignored.
2020-03-12 19:25:07,857 - denormalize.py - INFO - Received message [identifier: idx-source/index_denorm.csv/part-00023]
2020-03-12 19:25:13,362 - denormalize.py - INFO - Reacquiring message with id: e24cdb8f-644d-408d-b18d-a53f68a5cef2
2020-03-12 19:25:13,553 - denormalize.py - WARNING - Message e24cdb8f-644d-408d-b18d-a53f68a5cef2 NotFound on reacquire. Message will be ignored.
2020-03-12 19:25:13,553 - denormalize.py - INFO - Received message [identifier: idx-source-2/index_denorm.csv/part-00001]
2020-03-12 19:25:17,950 - denormalize.py - INFO - Reacquiring message with id: 5224510b-9270-4730-bd30-b47ee4366668
...
2020-03-12 19:26:55,881 - denormalize.py - INFO - Received message [identifier: idx-source/index_denorm.csv/part-00020]
2020-03-12 19:26:55,898 - denormalize.py - INFO - Reacquiring message with id: cf3a59ab-17f6-47ad-a62b-812dd5b69ca6
2020-03-12 19:26:55,948 - denormalize.py - WARNING - Message cf3a59ab-17f6-47ad-a62b-812dd5b69ca6 NotFound on reacquire. Message will be ignored.
2020-03-12 19:26:55,948 - denormalize.py - INFO - Received message [identifier: idx-source/index_denorm.csv/part-00022]
2020-03-12 19:26:55,964 - denormalize.py - INFO - Reacquiring message with id: c5bff532-1c9e-403e-9074-db709ef9d4b7
2020-03-12 19:26:55,975 - denormalize.py - WARNING - Message c5bff532-1c9e-403e-9074-db709ef9d4b7 NotFound on reacquire. Message will be ignored.
2020-03-12 19:26:55,975 - denormalize.py - INFO - Received message [identifier: idx-source/index_denorm.csv/part-00023]
2020-03-12 19:26:55,991 - denormalize.py - INFO - Reacquiring message with id: b5ffb8c6-d76c-42d7-8a59-1fccc0404db5
2020-03-12 19:26:56,048 - denormalize.py - WARNING - Message b5ffb8c6-d76c-42d7-8a59-1fccc0404db5 NotFound on reacquire. Message will be ignored.
2020-03-12 19:26:56,063 - denormalize.py - INFO - No messages in queue. Entering INACTIVE state and polling mode.
2020-03-12 19:27:06,900 - denormalize.py - INFO - New message in queue. Resuming ACTIVE state.
2020-03-12 19:27:06,900 - denormalize.py - INFO - Received message [identifier: idx-source-2/index_denorm.csv/part-00004]
2020-03-12 19:27:37,065 - denormalize.py - INFO - Reacquiring message with id: fa1da17f-57a6-4baf-a13a-18318c85afc8
2020-03-12 19:28:07,561 - denormalize.py - INFO - Reacquiring message with id: fa1da17f-57a6-4baf-a13a-18318c85afc8
2020-03-12 19:28:37,757 - denormalize.py - INFO - Reacquiring message with id: fa1da17f-57a6-4baf-a13a-18318c85afc8
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...