Google PubSub - перезапуск подписки после возникновения исключения - PullRequest
0 голосов
/ 10 февраля 2020

Добрый день,

Я выполняю несколько длительных асин c заданий, используя PubSub для запуска функции. Изредка задача может быть неудачной. В таких случаях я просто хочу зарегистрировать исключение, подтвердить сообщение и перезапустить подписку, чтобы подписчик продолжал получать новые сообщения после сбоя.

Я поместил некоторый упрощенный код для демонстрации моя текущая настройка ниже:

try:
    while True:
        streaming_pull_future = workers.subscriber.subscribe(
            subscription_path, callback=worker_task <- includes logic to ack() the message if it's failed before
        )

        print(f'Listening for messages on {subscription_path}')

        try:
            streaming_pull_future.result()
        except Exception as e: 
            print(streaming_pull_future.cancelled()) #<-- this evaluates to false
            streaming_pull_future.cancel() #<-- this results in RunTimeError: set_result can only be called once.
            print(e)

except KeyboardInterrupt: # seems to be an issue as per Github PubSub issue #17. No keyboard interrupt
    streaming_pull_future.cancel() 

Я продолжаю видеть RuntimeError: set_result can only be called once, когда я выполняю streaming_pull_future.cancel () в обработчике исключений. Я проверил, возможно ли, что подписчик уже был отменен, но когда я вышел из системы, он оценил ее как False. Но когда я затем вызываю метод cancel (), я получаю сообщение об ошибке. Я хочу убедиться, что все потоки очищены перед созданием новой подписки, если у меня может быть несколько ошибок. Кто-нибудь знает, почему это происходит и как обойти это?

Я работаю Python 3.7.4 с PubSub 1.2.0 и grpcio 1.27.1.


Обновление:

Согласно комментариям, пожалуйста, посмотрите воспроизводимый пример. Полученная трассировка стека включена:

    Listening for messages on projects/trigger-web-app/subscriptions/load-job-sub
968432700946405
Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
  File "C:\..\lib\site-packages\google\cloud\pubsub_v1\subscriber\_protocol\streaming_pull_manager.py", line 
71, in _wrap_callback_errors
    callback(message)
  File "test.py", line 19, in worker_task
    a = 1/0 # cause an exception to be raised
ZeroDivisionError: division by zero
968424309156485
Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
  File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\_protocol\streaming_pull_manager.py", line 
71, in _wrap_callback_errors
    callback(message)
  File "test.py", line 19, in worker_task
    a = 1/0 # cause an exception to be raised
ZeroDivisionError: division by zero
Traceback (most recent call last):
  File "test.py", line 29, in main
    streaming_pull_future.result()
  File "C:...\lib\site-packages\google\cloud\pubsub_v1\futures.py", line 105, in result
    raise err
  File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\_protocol\streaming_pull_manager.py", line 
71, in _wrap_callback_errors
    callback(message)
  File "test.py", line 19, in worker_task
    a = 1/0 # cause an exception to be raised
ZeroDivisionError: division by zero

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "test.py", line 35, in <module>
    main()
  File "test.py", line 31, in main
    streaming_pull_future.cancel()  
  File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\futures.py", line 46, in cancel
    return self._manager.close()
  File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\_protocol\streaming_pull_manager.py", line 
496, in close
    callback(self, reason)
  File "C:\...\lib\site-packages\google\cloud\pubsub_v1\subscriber\futures.py", line 37, in _on_close_callback
    self.set_result(True)
  File "C:\...\lib\site-packages\google\cloud\pubsub_v1\futures.py", line 155, in set_result
    raise RuntimeError("set_result can only be called once.")
RuntimeError: set_result can only be called once.
import os
from google.cloud import pubsub_v1

subscriber  = pubsub_v1.SubscriberClient()

project_id=os.environ['GOOGLE_CLOUD_PROJECT']
subscription_name=os.environ['GOOGLE_CLOUD_PUBSUB_SUBSCRIPTION_NAME']
subscription_path = f'projects/{project_id}/subscriptions/{subscription_name}'

def worker_task( message ):

    job_id = message.message_id
    print(job_id)
    a = 1/0 # cause an exception to be raised
    message.ack()


def main():
    streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=worker_task
        )

    print(f'Listening for messages on {subscription_path}')

    try:
        streaming_pull_future.result()
    except Exception as e:  
        streaming_pull_future.cancel()  # if exception in callback handler, this will raise a RunTimError
        print(e)

if __name__ == '__main__':
    main()

Спасибо.

...