Сообщение GCP остается в Pub / Sub после подтверждения - PullRequest
0 голосов
/ 08 октября 2019

У меня есть логика подписки Pub / Sub, заключенная в метод подписки, который вызывается один раз при инициализации службы для каждой подписки:

    def subscribe(self,
                  callback: typing.Callable,
                  subscription_name: str,
                  topic_name: str,
                  project_name: str = None) -> typing.Optional[SubscriberClient]:
        """Subscribes to Pub/Sub topic and return subscriber client

        :param callback: subscription callback method
        :param subscription_name: name of the subscription
        :param topic_name: name of the topic
        :param project_name: optional project name. Uses default project if not set
        :return: subscriber client or None if testing
        """
        project = project_name if project_name else self.pubsub_project_id
        self.logger.info('Subscribing to project `{}`, topic `{}`'.format(project, topic_name))

        project_path = self.pubsub_subscriber.project_path(project)
        topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
        subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)

        # check if there is an existing subscription, if not, create it
        if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
            self.logger.info('Creating new subscription `{}`, topic `{}`'.format(subscription_name, topic_name))
            self.pubsub_subscriber.create_subscription(subscription_path, topic_path)

        # subscribe to the topic
        self.pubsub_subscriber.subscribe(
            subscription_path, callback=callback,
            scheduler=self.thread_scheduler
        )
        return self.pubsub_subscriber

Этот метод вызывается так:

        self.subscribe_client = self.subscribe(
            callback=self.pubsub_callback,
            subscription_name='subscription_topic',
            topic_name='topic'
        )

Метод обратного вызова делает кучу вещей, отправляет 2 электронных письма, затем подтверждает сообщение

    def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
        self.logger.debug('Processing pub sub message')

        try:
            self.do_something_with_message(data)

            self.logger.debug('Acknowledging the message')
            data.ack()
            self.logger.debug('Acknowledged')
            return

        except:
            self.logger.warning({
                "message": "Failed to process Pub/Sub message",
                "request_size": data.size,
                "data": data.data
            }, exc_info=True)

        self.logger.debug('Acknowledging the message 2')
        data.ack()

Когда я запускаю push что-то в подписку, выполняется обратный вызов, печатается все сообщения отладки, включая Acknowledged. Сообщение, однако, остается в Pub / Sub, обратный вызов вызывается снова, и это занимает экспоненциальное время после каждой повторной попытки. Вопрос в том, что может привести к тому, что сообщение останется в pub / sub даже после вызова ack?

У меня есть несколько таких подписок, все они работают как положено. Крайний срок - не вариант, обратный вызов заканчивается почти сразу, и я все равно играл с крайним сроком ack, ничего не помогло.

Когда я пытаюсь обработать эти сообщения от локально запущенного приложения, подключенного к этому pub-sub, он просто завершаетштраф и подтверждение выводит сообщение из очереди, как и ожидалось.

  • Таким образом, проблема проявляется только в развернутой службе (запущенной в модуле kubernetes)
  • Обратный вызов выполняет бак-ак, казалось бы, ничего не делает
  • Получение сообщений от сценария, работающего локально (... и выполняющего те же действия) или через пользовательский интерфейс GCP, работает как ожидалось.

Есть идеи?

Ответы [ 2 ]

1 голос
/ 10 октября 2019

Подтверждения лучше всего подходят в Pub / Sub, поэтому возможна, но необычная возможность доставки сообщений.

Если вы постоянно получаете дубликаты, это может быть связано с дублированием публикаций одного и того же содержимого сообщения. Что касается Pub / Sub, то это разные сообщения, и им будут назначены разные идентификаторы сообщений. Проверьте предоставленные Pub / Sub идентификаторы сообщений, чтобы убедиться, что вы на самом деле получаете одно и то же сообщение несколько раз.

Существует крайний случай при работе с большими резервными копиями небольших сообщений с потоковой передачей (это то, что использует клиентская библиотека Python). Если вы используете несколько клиентов, подписывающихся на одну и ту же подписку, этот пограничный случай может быть уместным.

Вы также можете проверить Метрики Stackdriver вашей подписки , чтобы увидеть:

  • если его подтверждения были успешно отправлены (subscription/ack_message_count)
  • если его отставание уменьшается (subscription/backlog_bytes)
  • если ваш подписчик пропустил крайний срок подтверждения (subscription/streaming_pull_ack_message_operation_count отфильтровано с помощью response_code != "success")

Если вы не пропустили крайний срок подтверждения и ваше отставание остается неизменным, вам следует обратиться в службу поддержки Google Cloud с указанием названия проекта, имени подписки и образца идентификаторов дубликатов сообщений. ,Они смогут выяснить, почему происходят эти дубликаты.

0 голосов
/ 17 октября 2019

Я провел дополнительное тестирование и наконец нашел проблему.

TL; DR: я использовал один и тот же google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler для всех подписок.

Вот фрагменты кода, который я использовал для его тестирования. Это неверная версия:

server.py

import concurrent.futures.thread
import os
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler


def create_subscription(project_id, topic_name, subscription_name):
    """Create a new pull subscription on the given topic."""
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    subscription = subscriber.create_subscription(
        subscription_path, topic_path)

    print('Subscription created: {}'.format(subscription))


def receive_messages(project_id, subscription_name, t_scheduler):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message.data))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback, scheduler=t_scheduler)
    print('Listening for messages on {}'.format(subscription_path))


project_id = os.getenv("PUBSUB_PROJECT_ID")

publisher = pubsub_v1.PublisherClient()
project_path = publisher.project_path(project_id)

# Create both topics
try:
    topics = [topic.name.split('/')[-1] for topic in publisher.list_topics(project_path)]
    if 'topic_a' not in topics:
        publisher.create_topic(publisher.topic_path(project_id, 'topic_a'))
    if 'topic_b' not in topics:
        publisher.create_topic(publisher.topic_path(project_id, 'topic_b'))
except AlreadyExists:
    print('Topics already exists')

# Create subscriptions on both topics
sub_client = pubsub_v1.SubscriberClient()
project_path = sub_client.project_path(project_id)

try:
    subs = [sub.name.split('/')[-1] for sub in sub_client.list_subscriptions(project_path)]
    if 'topic_a_sub' not in subs:
        create_subscription(project_id, 'topic_a', 'topic_a_sub')
    if 'topic_b_sub' not in subs:
        create_subscription(project_id, 'topic_b', 'topic_b_sub')
except AlreadyExists:
    print('Subscriptions already exists')

scheduler = ThreadScheduler(concurrent.futures.thread.ThreadPoolExecutor(10))

receive_messages(project_id, 'topic_a_sub', scheduler)
receive_messages(project_id, 'topic_b_sub', scheduler)

while True:
    time.sleep(60)

client.py

import datetime
import os
import random
import sys
from time import sleep

from google.cloud import pubsub_v1


def publish_messages(pid, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(pid, topic_name)

    for n in range(1, 10):
        data = '[{} - {}] Message number {}'.format(datetime.datetime.now().isoformat(), topic_name, n)
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)
        sleep(random.randint(10, 50) / 10.0)


project_id = os.getenv("PUBSUB_PROJECT_ID")
publish_messages(project_id, sys.argv[1])

Я подключенВ облачном пабе / сабе сервер создавал темы и подписки. Затем я запускал клиентский скрипт несколько раз параллельно для обеих тем. Через некоторое время, после того как я изменил код сервера для создания нового планировщика потоков в области действия receive_messages, сервер очистил обе темы и функционировал, как и ожидалось.

Смущает то, что в любом случае сервер печаталполученное сообщение для всех сообщений.

Я собираюсь опубликовать это на https://github.com/googleapis/google-cloud-python/issues

...