Повторное извлечение сообщения GCP PubSub Spring Boot - PullRequest
1 голос
/ 11 июня 2019

Мне нужна помощь с проблемой с gcp pub / sus.У меня есть процесс, который отправляет 100 сообщений с фильтрами в pubsub, и другое приложение (при загрузке) получает эти сообщения.Когда приложение весенней загрузки получает сообщение от pubsub (не pull), обрабатывает 100 сообщений, но при этом получает больше сообщений, в разное время получает разное количество сообщений, в любое время получает 120, еще 140 и другие больше 200.Я не нашел никакого решения этого, это мой код:

    @Bean
    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public MessageHandler messageReceiver() {
        return message -> {
            System.out.println("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
            //other process of app (call other api)
            AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
            consumer.ack();
        };
    }

пожалуйста, помогите мне !!!

1 Ответ

1 голос
/ 11 июня 2019

Дублирующиеся сообщения могут возникать по разным причинам в Google Cloud Pub / Sub. Следует иметь в виду, что Cloud Pub / Sub предлагает доставку хотя бы один раз, что означает, что всегда возможно некоторое количество дубликатов, поэтому ваше приложение должно быть устойчивым к ним. Хотя такое количество дубликатов кажется немного высоким. Как правило, дубликаты могут возникать по следующим причинам:

  1. Сообщения отправляются издателем более одного раза. Это может произойти, если издатель отключился от Cloud Pub / Sub и снова отправил то же сообщение. Если этот тип дублирования происходит, то сообщения будут иметь разные идентификаторы сообщений.
  2. Абоненту требуется слишком много времени для подтверждения сообщений. В вашем коде у вас есть //other process of app (call other api). Как долго длится этот процесс? Если он больше, чем крайний срок для подтверждения сообщения, то сообщение будет доставлено повторно. Имейте в виду, что, если этот другой процесс требует захвата блокировок для всех сообщений, может возникнуть проблема конкуренции из-за слишком большого числа запросов, пытающихся получить эти блокировки одновременно, что приведет к задержкам обработки. По умолчанию срок подтверждения сообщения составляет десять секунд. При использовании клиентской библиотеки Java крайний срок автоматически увеличивается на maxAckExtensionPeriod , который по умолчанию равен одному часу. Это свойство также можно установить в DefaultSubscriberFactory для Spring.
  3. Сообщения вообще не проверяются. Если исключение препятствует вызову ack или возникает тупик, в результате которого эта строка кода не будет достигнута, сообщение будет доставлено.
  4. Вариант использования - одно из большого количества невыполненных небольших сообщений . В этой ситуации буферы могут заполняться клиентом таким образом, что это приводит к повторной доставке сообщений.
...