Добрый день,
Я выполняю несколько длительных асин c заданий, используя PubSub для запуска функции. Изредка задача может быть неудачной. В таких случаях я просто хочу зарегистрировать исключение, подтвердить сообщение и перезапустить подписку, чтобы подписчик продолжал получать новые сообщения после сбоя.
Я поместил некоторый упрощенный код для демонстрации моя текущая настройка ниже:
try:
while True:
streaming_pull_future = workers.subscriber.subscribe(
subscription_path, callback=worker_task <- includes logic to ack() the message if it's failed before
)
print(f'Listening for messages on {subscription_path}')
try:
streaming_pull_future.result()
except Exception as e:
print(streaming_pull_future.cancelled()) #<-- this evaluates to false
streaming_pull_future.cancel() #<-- this results in RunTimeError: set_result can only be called once.
print(e)
except KeyboardInterrupt: # seems to be an issue as per Github PubSub issue #17. No keyboard interrupt
streaming_pull_future.cancel()
Я продолжаю видеть RuntimeError: set_result can only be called once
, когда я выполняю streaming_pull_future.cancel () в обработчике исключений. Я проверил, возможно ли, что подписчик уже был отменен, но когда я вышел из системы, он оценил ее как False. Но когда я затем вызываю метод cancel (), я получаю сообщение об ошибке. Я хочу убедиться, что все потоки очищены перед созданием новой подписки, если у меня может быть несколько ошибок. Кто-нибудь знает, почему это происходит и как обойти это?
Я работаю Python 3.7.4 с PubSub 1.2.0 и grpcio 1.27.1.
Обновление:
Согласно комментариям, пожалуйста, посмотрите воспроизводимый пример. Полученная трассировка стека включена:
Listening for messages on projects/trigger-web-app/subscriptions/load-job-sub
968432700946405
Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
File "C:\..\lib\site-packages\google\cloud\pubsub_v1\subscriber\_protocol\streaming_pull_manager.py", line
71, in _wrap_callback_errors
callback(message)
File "test.py", line 19, in worker_task
a = 1/0 # cause an exception to be raised
ZeroDivisionError: division by zero
968424309156485
Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\_protocol\streaming_pull_manager.py", line
71, in _wrap_callback_errors
callback(message)
File "test.py", line 19, in worker_task
a = 1/0 # cause an exception to be raised
ZeroDivisionError: division by zero
Traceback (most recent call last):
File "test.py", line 29, in main
streaming_pull_future.result()
File "C:...\lib\site-packages\google\cloud\pubsub_v1\futures.py", line 105, in result
raise err
File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\_protocol\streaming_pull_manager.py", line
71, in _wrap_callback_errors
callback(message)
File "test.py", line 19, in worker_task
a = 1/0 # cause an exception to be raised
ZeroDivisionError: division by zero
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "test.py", line 35, in <module>
main()
File "test.py", line 31, in main
streaming_pull_future.cancel()
File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\futures.py", line 46, in cancel
return self._manager.close()
File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\_protocol\streaming_pull_manager.py", line
496, in close
callback(self, reason)
File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\futures.py", line 37, in _on_close_callback
self.set_result(True)
File "C:\...\lib\site-packages\google\cloud\pubsub_v1\futures.py", line 155, in set_result
raise RuntimeError("set_result can only be called once.")
RuntimeError: set_result can only be called once.
import os
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
project_id=os.environ['GOOGLE_CLOUD_PROJECT']
subscription_name=os.environ['GOOGLE_CLOUD_PUBSUB_SUBSCRIPTION_NAME']
subscription_path = f'projects/{project_id}/subscriptions/{subscription_name}'
def worker_task( message ):
job_id = message.message_id
print(job_id)
a = 1/0 # cause an exception to be raised
message.ack()
def main():
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=worker_task
)
print(f'Listening for messages on {subscription_path}')
try:
streaming_pull_future.result()
except Exception as e:
streaming_pull_future.cancel() # if exception in callback handler, this will raise a RunTimError
print(e)
if __name__ == '__main__':
main()
Спасибо.