Сообщения Google Cloud PubSub, не обработанные обратным вызовом - PullRequest
0 голосов
/ 05 сентября 2018

Я пытаюсь использовать Google PubSub для передачи и получения сообщений между двумя службами. Однако некоторые из отправленных сообщений, по-видимому, отбрасываются случайным образом и не обрабатываются методом обратного вызова подписчика.

При отправке сообщений около половины сообщений обрабатываются методом обратного вызова. Для другой половины метод обратного вызова, кажется, не вызывается вообще (информация не регистрируется). Однако сообщения по-прежнему исчезают из темы и не отправляются повторно.

Код, используемый для запуска абонента:

logger = logging.getLogger(LOGGER_NAME)
logger.info('Starting the pubsub subscriber')
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(GOOGLE_CLOUD_PROJECT, SUBSCRIPTION_NAME)
subscriber.subscribe(subscription_path, callback=callback)
while True:
    try:
        sleep(60)
    except Exception as e:
        // Log exception

Метод обратного вызова:

def callback(message):
    logger = logging.getLogger(LOGGER_NAME)
    logger.info(f'Recieved callback with message: {message}', extra = {'callback_message': message}  )
    // Process message

Ошибка, по-видимому, на стороне абонента. Сообщения отправляются от издателя, и если подписчик не подключен к теме, сообщения не исчезают.

Я пытался использовать управление потоком для контроля количества сообщений, получаемых подписчиком, но, похоже, это не оказывает никакого влияния.

Могут ли сообщения обрабатываться без вызова метода обратного вызова? Существуют ли другие причины, по которым сообщения могут исчезать из темы?

РЕДАКТИРОВАТЬ: Оказывается, что другой сервис считывал из той же подписки, обрабатывая пропущенные сообщения.

1 Ответ

0 голосов
/ 17 сентября 2018

Я знаю, что вы нашли ответ на свою проблему, но я подумал, что было бы целесообразно перечислить некоторые полезные шаги для отладки этого типа проблемы:

  1. Проверьте, чтобы сообщения действительно были опубликованы. Когда публикация завершается успешно, ответ должен содержать идентификатор сообщения, например, в виде строки, полученной APIFuture в методе Java Publish .
  2. Проверьте, не создается ли резерв сообщений. Вы можете просматривать subscription/oldest_unacked_message_age и subscription/num_undelivered_messages через Stackdriver .
  3. Проверьте, не установлен ли на вашем подписчике контроль потока , который мешает вам своевременно получать все сообщения. Если у вас установлено управление потоком данных, и оно препятствует доставке всех сообщений, вы, скорее всего, увидите, что количество недоставленных сообщений увеличивается в Stackdriver.
  4. Убедитесь, что у вас нет дополнительных клиентов, подписывающихся на сообщения с той же подпиской. Например, возможно, вы используете инструмент gcloud для извлечения и просмотра сообщений. В этой ситуации вы, вероятно, не увидите увеличения числа недоставленных сообщений в Stackdriver.

Если после проверки всего этого вы не уверены, что происходит с вашими сообщениями, лучше связаться со службой поддержки, указав название вашего проекта и подписку, а также идентификаторы всех сообщений, которые, по вашему мнению, не были доставлены.

...