Google pubsub позднее подтверждение - PullRequest
0 голосов
/ 13 февраля 2020

У меня есть приложение, развернутое в GKE, разделенное на разные микросервисы. Один из микросервисов, назовем его «рабочий», получает задачи для выполнения из сообщений pubsub.

Выполнение задач может занять до 1 часа. Обычный крайний срок подтверждения сообщений Google pubsub довольно короткий, мы обновляем его каждые 10 секунд до его окончания. Вот фрагмент кода, ответственный за это:

def watchdog(businessDoneEvent, subscription, ack_deadline, message, ack_id):
    '''
        Prevents message from being republished as long as computation is 
        running
    '''
    while True:
        # Wait (defaultDeadline - 10) seconds before renewing if defaultDeadline
        # is > 5 seconds; renewed every second otherwise
        sleepTime = ack_deadline - 10 if ack_deadline > 10 else 1
        startTime = time.time()
        while time.time() - startTime < sleepTime:
            LOGGER.info('Sleeping time: {} - ack_deadline: {}'.format(time.time() - startTime, ack_deadline))
            if businessDoneEvent.isSet():
                LOGGER.info('Business done!')
                return
            time.sleep(1)

        subscriber = SubscriberClient()

        LOGGER.info('Modifying ack deadline for message ' + 
                          str(message.data) + ' processing to ' +
                          str(ack_deadline))
        subscriber.modify_ack_deadline(subscription, [ack_id],
                                         ack_deadline)

Как только выполнение завершено, мы достигаем этого фрагмента кода:

def callbackWrapper(callback,
                    subscription,
                    message,
                    ack_id,
                    endpoint,
                    context,
                    subscriber,
                    postAcknowledgmentCallback=None):
    '''
        Pub/sub message acknowledgment if everything ran correctly
    '''
    try:
        callback(message.data, endpoint, context, **message.attributes)
    except Exception as e:
        LOGGER.info(message.data)
        LOGGER.error(traceback.format_exc())
        raise e
    else:
        LOGGER.info("Trying to acknowledge...")
        my_retry = Retry(predicate=if_exception_type(ServiceUnavailable), deadline=3600)
        subscriber.acknowledge(subscription, [ack_id], retry=my_retry)
        LOGGER.info(str(ack_id) + ' has been acknowledged')
        if postAcknowledgmentCallback is not None:
            postAcknowledgmentCallback(message.data, 
                                       **message.attributes)

Обратите внимание, что мы также используем этот код в большинстве из наших микросервисов, и он работает просто отлично.

Моя проблема в том, что, хотя я не получаю никакой ошибки из этого кода, и кажется, что запрос подтверждения отправляется правильно, он фактически подтверждается позже. Например, согласно консоли GCP, сейчас у меня есть 8 неподтвержденных сообщений, но у меня должно быть только 3. Также было сказано, что есть 12, когда мне нужно только 5 в течение часа: enter image description here

У меня есть горизонтальный автоскейлер, использующий pubsub metri c. Когда стручки сделаны, они не уменьшаются, или только через 1 час или более. Это создает некоторые бесполезные затраты, которых я бы хотел избежать.

Кто-нибудь имеет представление о том, почему это происходит?

...