У меня есть логика подписки Pub / Sub, заключенная в метод подписки, который вызывается один раз при инициализации службы для каждой подписки:
def subscribe(self,
callback: typing.Callable,
subscription_name: str,
topic_name: str,
project_name: str = None) -> typing.Optional[SubscriberClient]:
"""Subscribes to Pub/Sub topic and return subscriber client
:param callback: subscription callback method
:param subscription_name: name of the subscription
:param topic_name: name of the topic
:param project_name: optional project name. Uses default project if not set
:return: subscriber client or None if testing
"""
project = project_name if project_name else self.pubsub_project_id
self.logger.info('Subscribing to project `{}`, topic `{}`'.format(project, topic_name))
project_path = self.pubsub_subscriber.project_path(project)
topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)
# check if there is an existing subscription, if not, create it
if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
self.logger.info('Creating new subscription `{}`, topic `{}`'.format(subscription_name, topic_name))
self.pubsub_subscriber.create_subscription(subscription_path, topic_path)
# subscribe to the topic
self.pubsub_subscriber.subscribe(
subscription_path, callback=callback,
scheduler=self.thread_scheduler
)
return self.pubsub_subscriber
Этот метод вызывается так:
self.subscribe_client = self.subscribe(
callback=self.pubsub_callback,
subscription_name='subscription_topic',
topic_name='topic'
)
Метод обратного вызова делает кучу вещей, отправляет 2 электронных письма, затем подтверждает сообщение
def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
self.logger.debug('Processing pub sub message')
try:
self.do_something_with_message(data)
self.logger.debug('Acknowledging the message')
data.ack()
self.logger.debug('Acknowledged')
return
except:
self.logger.warning({
"message": "Failed to process Pub/Sub message",
"request_size": data.size,
"data": data.data
}, exc_info=True)
self.logger.debug('Acknowledging the message 2')
data.ack()
Когда я запускаю push что-то в подписку, выполняется обратный вызов, печатается все сообщения отладки, включая Acknowledged
. Сообщение, однако, остается в Pub / Sub, обратный вызов вызывается снова, и это занимает экспоненциальное время после каждой повторной попытки. Вопрос в том, что может привести к тому, что сообщение останется в pub / sub даже после вызова ack
?
У меня есть несколько таких подписок, все они работают как положено. Крайний срок - не вариант, обратный вызов заканчивается почти сразу, и я все равно играл с крайним сроком ack, ничего не помогло.
Когда я пытаюсь обработать эти сообщения от локально запущенного приложения, подключенного к этому pub-sub, он просто завершаетштраф и подтверждение выводит сообщение из очереди, как и ожидалось.
- Таким образом, проблема проявляется только в развернутой службе (запущенной в модуле kubernetes)
- Обратный вызов выполняет бак-ак, казалось бы, ничего не делает
- Получение сообщений от сценария, работающего локально (... и выполняющего те же действия) или через пользовательский интерфейс GCP, работает как ожидалось.
Есть идеи?