Как можно поймать исключения в подписчике Python PubSub, которые происходят во внутренних / библиотечных потоках? - PullRequest
3 голосов
/ 06 апреля 2019

Я использую потребителей pubsub, которые обрабатывают входящие сообщения со скоростью около одного в секунду. В целом все работает нормально, однако каждые несколько дней или часов мы иногда видим исключения, создаваемые внутренними потоками модулей pubsub, и мне не ясно, как их перехватить. Вот типичный пример (встречаются и другие похожие следы с немного отличающимися сообщениями):

Exception in thread Thread-LeaseMaintainer:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/grpc/_channel.py", line 549, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/usr/local/lib/python3.6/site-packages/grpc/_channel.py", line 466, in _end_unary_response_blocking
    raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "channel is in state TRANSIENT_FAILURE"
    debug_error_string = "{"created":"@1554568036.075280756","description":"channel is in state TRANSIENT_FAILURE","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":2294,"grpc_status":14}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/google/api_core/retry.py", line 179, in retry_target
    return target()
  File "/usr/local/lib/python3.6/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.ServiceUnavailable: 503 channel is in state TRANSIENT_FAILURE

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py", line 146, in maintain_leases
    [requests.ModAckRequest(ack_id, p99) for ack_id in ack_ids]
  File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py", line 152, in modify_ack_deadline
    self._manager.send(request)
  File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 268, in send
    self._send_unary_request(request)
  File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 259, in _send_unary_request
    ack_deadline_seconds=deadline,
  File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/_gapic.py", line 45, in <lambda>
    fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)  # noqa
  File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/gapic/subscriber_client.py", line 723, in modify_ack_deadline
    request, retry=retry, timeout=timeout, metadata=metadata
  File "/usr/local/lib/python3.6/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/google/api_core/retry.py", line 270, in retry_wrapped_func
    on_error=on_error,
  File "/usr/local/lib/python3.6/site-packages/google/api_core/retry.py", line 199, in retry_target
    last_exc,
  File "<string>", line 3, in raise_from
google.api_core.exceptions.RetryError: Deadline of 600.0s exceeded while calling functools.partial(<function _wrap_unary_errors.<locals>.error_remapped_callable at 0x7f86228cd400>, subscription: "projects/xxxxx-dev/subscriptions/telemetry-sub"
ack_deadline_seconds: 10
ack_ids: "QBJMJwFESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRoHIGoKOUJdEmJoXFx1B1ALEHQoYnxvWRYFCEdReF1YHQdodGxXOFUEHnN1Y3xtWhQDAEFXf3f8gIrJ38BtZho9WxJLLD5-LDRFQV4"
, metadata=[('x-goog-api-client', 'gl-python/3.6.8 grpc/1.19.0 gax/1.8.2 gapic/0.40.0')]), last exception: 503 channel is in state TRANSIENT_FAILURE

Thread-ConsumeBidirectionalStream caught unexpected exception Deadline of 600.0s exceeded while calling functools.partial(<function _wrap_unary_errors.<locals>.error_remapped_callable at 0x7f86228cda60>, subscription: "projects/xxxxx-dev/subscriptions/telemetry-sub"
ack_deadline_seconds: 10
ack_ids: "QBJMJwFESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRoHIGoKOUJdEmJoXFx1B1ALEHQoYnxvWRYFCEdReF1YHAdodGxXOFUEHnN1aXVoWxAIBEdXeXf8gIrJ38BtZho9WxJLLD5-LDRFQV4"
, metadata=[('x-goog-api-client', 'gl-python/3.6.8 grpc/1.19.0 gax/1.8.2 gapic/0.40.0')]), last exception: 503 channel is in state TRANSIENT_FAILURE and will exit.

(В данном конкретном случае подписчик получил несколько аналогичных ошибок и вообще прекратил потребление сообщений (без выхода из основного потока), хотя в случае других таких библиотечных ошибок наблюдалось другое поведение.)

Наш код выглядит примерно так (с некоторым упрощением):

client = pubsub_v1.SubscriberClient()
flow_control = pubsub_v1.types.FlowControl(max_messages=500)
future = client.subscribe(subscription_path, callback=callback, 
     flow_control=flow_control)
...
    try:
        future.result(timeout=1)
    except pubsub_v1.exceptions.TimeoutError:
        pass
    except _Rendezvous as exc:
        logger.error('Got Rendezvous error in subscriber. Retrying in 1s. '
                     f'Detail: {exc}')
        # NEVER GET HERE
        time.sleep(1)
        continue
    except RetryError as exc:
        logger.error('Got RetryError in subscriber. Retrying in 1s. '
                     f'Detail: {exc}')
        # NEVER GET HERE
        time.sleep(1)
        continue
    except Exception as exc:  # pylint: disable=broad-except
        logger.exception('Got uncaught exception in subscriber or callback ...')
        # NEVER GET HERE

Ни одна из ветвей исключений не используется. Это заставляет меня думать, что исключение происходит в некотором фоновом потоке и не сообщается вызовом future.result (), несмотря на документацию ( GCP PubSub Docs ) об обратном. Я прочитал несколько проблем с GitHub, просмотрел документацию и проанализировал вопросы SO, связанные с этим, но я не смог найти решение, которое на самом деле ловит такие ошибки. Буду признателен за любую помощь или совет.

Версия:

  • python == 3.6.5
  • google-cloud-pubsub == 0.40.0 # но это повело себя аналогично для как минимум нескольких последних версий
  • google-api-core == 1.8.2
  • google-api-python-client == 1.7.8

1 Ответ

1 голос
/ 09 апреля 2019

Нет необходимости отлавливать эти внутренние ошибки и обрабатывать их.Это временные ошибки при подключении библиотеки к сервису Google Cloud Pub / Sub, и сама библиотека должна их обрабатывать.Если это не так - и, в частности, если он предотвращает получение дальнейших сообщений без перезапуска, то это ошибка, и в репо GitHub для клиентской библиотеки Python .

следует указать проблему.
...