Google PubSub и дублированные сообщения из ТЕМЫ - PullRequest
0 голосов
/ 18 декабря 2018

Как предотвратить дублирование сообщений в Google Cloud PubSub?

Скажем, у меня есть код, который обрабатывает сообщение, на которое он подписан.

Скажем, у меня 2 узла ста же Служба, которая имеет этот код.

Как только один получил сообщение, но еще не подтвердил его, другой узел получит то же сообщение.И вот где проблема в том, что у нас есть два дублированных сообщения .

void messageReceiver(PubsubMessage pubsubMessage, AckReplyConsumer ackReply) {

        submitHandler.handle(toMessage(pubsubMessage))
                .doOnSuccess((response) -> {
                    log.info("Acknowledging the successfully processed message id: {}, response {}", pubsubMessage.getMessageId(), response);
                    ackReply.ack();  // <---- acknowledged
                })
                .doOnError((e) -> {
                    log.error("Not acknowledging due to an exception", e);
                    ackReply.nack();
                })
                .doOnTerminate(span::finish)
                .subscribe();
    }

Какое решение для этого?Это нормальное поведение?

Ответы [ 3 ]

0 голосов
/ 31 мая 2019

Вы можете использовать Redis из Memorystore для дедупликации сообщений.Ваш издатель должен добавить трассировку iD в тело сообщения непосредственно перед публикацией его в PubSub.С другой стороны, клиент (подписчик) должен проверить, находится ли идентификатор кеша в кэше - пропустить сообщение.Если такого сообщения нет - обработайте сообщение и добавьте идентификатор трассировки в кэш с истечением 7-8 дней (крайний срок публикации PubSub - 7 дней).Таким простым способом Вы можете предоставить правильные полученные сообщения.

0 голосов
/ 22 июля 2019

Все сообщения в данной теме имеют уникальное поле messageID:

Идентификатор этого сообщения, назначаемый сервером при публикации сообщения. Гарантируется быть уникальным в теме.Это значение может быть прочитано подписчиком, который получает сообщение PubsubMessage с помощью вызова subscription.pull или принудительной доставки.Он не должен быть заполнен издателем в вызове themes.publish.

Вы можете использовать его для дедупликации входящих сообщений.Нет необходимости вручную назначать идентификатор.

В распределенных системах это немного сложнее (например, несколько экземпляров потребителей для данной подписки).Вам потребуется глобальный механизм синхронизации, самый простой - настроить базу данных (например, Redis) и использовать ее для хранения идентификаторов обработанных сообщений.

Вам следует взглянуть на Воспроизведение и удаление сообщений в котором описано, как настроить сохранение сообщений.

Существует два свойства подписки:

  • retain_acked_messages - сохранять сообщения подтверждения,
  • message_retention_duration - как долгохранить сообщения.

Если вы не планируете перематывать свою подписку на прошлый момент времени, например, если вы не планируете повторную обработку сообщений или у вас есть ошибки, заставляющие вас сбросить подписку, вы можете установитьretain_acked_messages=false и message_retention_duration='3600s'.Это позволит вам сохранять только идентификаторы сообщений последнего часа.

Имейте в виду, что сообщение PubSub также имеет publish_time, поэтому вам не нужно добавлять его в данные вашего сообщения.Может использоваться с message_id.Оба они устанавливаются сервером PubSub при получении сообщения.

0 голосов
/ 18 декабря 2018

Google Cloud Pub / Sub использует доставку «По крайней мере, один раз».Начиная с Документы :

Как правило, Cloud Pub / Sub доставляет каждое сообщение один раз и в том порядке, в котором оно было опубликовано.Однако сообщения могут иногда доставляться не по порядку или более одного раза.В целом, для обеспечения доставки более одного раза требуется, чтобы ваш подписчик был идемпотентным при обработке сообщений.

Это означает, что он гарантирует, что доставит сообщение 1: N раз, поэтомувы можете получить сообщение несколько раз, если не передадите его через что-то еще, что сначала его дедуплицирует.Нет настройки, которую вы можете определить, чтобы гарантировать точную доставку.В документах указано, что вы можете получить желаемое поведение, используя PubSubIO Cloud Dataflow, но такое решение устарело :

Вы можете достичь ровно одной обработки Cloud Pub/ Подпотоки сообщений с использованием Cloud Dataflow PubsubIO.PubsubIO не дублирует сообщения на пользовательских идентификаторах сообщений или назначенных Cloud Pub / Sub.

Говоря все это, я никогда не видел на самом деле Google Cloud Pub / Sub sendсообщение дважды.Вы уверены, что это действительно ваша проблема, или сообщение переиздается, потому что вы не подтверждаете сообщение в течение Крайнего срока подтверждения (как вы указали выше, по умолчанию это 10 секунд).Если вы не признаете это, оно будет переиздано.Начиная с документы (выделение мое) :

Подписка создается на одну тему.Он имеет несколько свойств, которые могут быть установлены во время создания или обновлены позже, в том числе:

  • срок подтверждения : Если ваш код не подтверждает сообщение досрок, сообщение отправляется снова. По умолчанию 10 секунд.Максимальный настраиваемый конечный срок, который вы можете указать, составляет 600 секунд (10 минут).

Если это так, просто подтвердите свои сообщения в указанный срок, и вы не увидите эти дубликаты так часто.

...