У меня есть приложение, развернутое в GKE, разделенное на разные микросервисы. Один из микросервисов, назовем его «рабочий», получает задачи для выполнения из сообщений pubsub.
Выполнение задач может занять до 1 часа. Обычный крайний срок подтверждения сообщений Google pubsub довольно короткий, мы обновляем его каждые 10 секунд до его окончания. Вот фрагмент кода, ответственный за это:
def watchdog(businessDoneEvent, subscription, ack_deadline, message, ack_id):
'''
Prevents message from being republished as long as computation is
running
'''
while True:
# Wait (defaultDeadline - 10) seconds before renewing if defaultDeadline
# is > 5 seconds; renewed every second otherwise
sleepTime = ack_deadline - 10 if ack_deadline > 10 else 1
startTime = time.time()
while time.time() - startTime < sleepTime:
LOGGER.info('Sleeping time: {} - ack_deadline: {}'.format(time.time() - startTime, ack_deadline))
if businessDoneEvent.isSet():
LOGGER.info('Business done!')
return
time.sleep(1)
subscriber = SubscriberClient()
LOGGER.info('Modifying ack deadline for message ' +
str(message.data) + ' processing to ' +
str(ack_deadline))
subscriber.modify_ack_deadline(subscription, [ack_id],
ack_deadline)
Как только выполнение завершено, мы достигаем этого фрагмента кода:
def callbackWrapper(callback,
subscription,
message,
ack_id,
endpoint,
context,
subscriber,
postAcknowledgmentCallback=None):
'''
Pub/sub message acknowledgment if everything ran correctly
'''
try:
callback(message.data, endpoint, context, **message.attributes)
except Exception as e:
LOGGER.info(message.data)
LOGGER.error(traceback.format_exc())
raise e
else:
LOGGER.info("Trying to acknowledge...")
my_retry = Retry(predicate=if_exception_type(ServiceUnavailable), deadline=3600)
subscriber.acknowledge(subscription, [ack_id], retry=my_retry)
LOGGER.info(str(ack_id) + ' has been acknowledged')
if postAcknowledgmentCallback is not None:
postAcknowledgmentCallback(message.data,
**message.attributes)
Обратите внимание, что мы также используем этот код в большинстве из наших микросервисов, и он работает просто отлично.
Моя проблема в том, что, хотя я не получаю никакой ошибки из этого кода, и кажется, что запрос подтверждения отправляется правильно, он фактически подтверждается позже. Например, согласно консоли GCP, сейчас у меня есть 8 неподтвержденных сообщений, но у меня должно быть только 3. Также было сказано, что есть 12, когда мне нужно только 5 в течение часа:
У меня есть горизонтальный автоскейлер, использующий pubsub metri c. Когда стручки сделаны, они не уменьшаются, или только через 1 час или более. Это создает некоторые бесполезные затраты, которых я бы хотел избежать.
Кто-нибудь имеет представление о том, почему это происходит?