У меня есть служба, которая подписывается на PubSub topi c и использует асинхронное извлечение. После 10 минут бездействия и отсутствия сообщений PubSub выдает ошибку 504: превышен крайний срок.
Ошибка всегда возникает примерно через 10 минут. Все обнаруженные мной похожие проблемы были связаны с синхронным вытягиванием, а не с асинхронным вытягиванием, которое я использую.
Сообщение об ошибке:
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 504 Deadline Exceeded
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed recoverable stream error 504 Deadline Exceeded
INFO:google.api_core.bidi:Re-established stream
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 504 Deadline Exceeded
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed recoverable stream error 504 Deadline Exceeded
Класс подписчика:
from google.cloud import pubsub_v1
class Subscriber():
def __init__(self):
self.subscriber = pubsub_v1.SubscriberClient()
self.project_id = "my-project-id"
self.subscription_id = "SUBSCRIPTION_MAIL_RECIEVE"
self.subscription_path = self.subscriber.subscription_path(self.project_id,
self.subscription_id)
def subscribe(self, callback):
"""
Parameters:
callback (callable): function to be called for incoming messages on given topic
"""
streaming_pull_future = self.subscriber.subscribe(self.subscription_path,
callback=callback)
return streaming_pull_future
Прослушивание сообщений:
subscriber = Subscriber()
pull_future = subscriber.subscribe(my_callback_function(message))
with subscriber.subscriber:
try:
print("Waiting for messages")
pull_future.result()
except TimeoutError:
pull_future.cancel()