Не знаю, что происходит в последнее время, но мы столкнулись с несколькими проблемами с PubSub + GCF.
Мы публикуем sh 100K больших сообщений, используя Python Client lib. Мы подтверждаем это с помощью возвращенных message_id.
Однако подписанный фоновый GCF (Python) не срабатывает постоянно. Для 500 сообщений или около того (без шаблона):
- функция выполняется полностью, но стандартный вывод отсутствует в журналах SD. Он показывает только, например,
Function execution started
Function execution took 1634 ms, finished with status: 'ok'
Функция выполняется частично и дает сбой в любом месте кода, например, сразу после самого первого оператора печати
Function execution started
key:3330275270010019
Function execution took 54 ms, finished with status: 'crash'
Первая проблема - проблема GCF (или ведение журнала SD?), Поскольку stdout
всегда должен отразить в SD - см. https://cloud.google.com/functions/docs/monitoring/logging
Второй случай вызывает большее беспокойство, поскольку сообщение подтверждается даже при сбое функции - согласно * примечанию на https://cloud.google.com/functions/docs/calling/pubsub#sample_code, этого не должно быть.
Мы хотим придерживаться print()
, потому что журналы SD показывают контекст execution_id
. Используя gcloud logging
, он записывает выходные данные под общим c 'глобальным' типом ресурса, поэтому мы не можем ie всех выходных данных.
Правильное ведение журнала было давней проблемой с GCF Python, но print()
не работает - это новость.
Для PubSub, как " успешное выполнение функции"определено?" Если это просто возможность запустить / вызвать функцию, это (очень) вводит в заблуждение, и в документации + образец кода должно быть указано: всегда развертывать с опцией «повтора».
Хотя в идеале , PubSub должен снова отправить sh сообщение в случае сбоя функции, чтобы оно не потерялось.
Есть ли у кого-нибудь лучший опыт использования подписчика функции HTTP или PubSub Lite?
EDIT - функция является общей функцией цепочки c, но по запросу:
import base64
import json
from google.cloud import pubsub_v1
import requests
GCP_PROJECT = os.getenv("GCP_PROJECT")
OUT_TOPIC = f'projects/{GCP_PROJECT}/topics/taxcalcs.out'
session = requests.Session()
publisher = pubsub_v1.PublisherClient()
def process_taxcalcs(event, context):
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
parsed = json.loads(pubsub_message)
print('key:' + parsed['key']) # to count in bq sink
print('in_message_id:' + context.event_id)
url = f"http://{event['attributes']['host_port']}/jsoninvoice"
response = session.post(url, data=pubsub_message)
print(f'taxcalc_duration:{response.elapsed}')
attrs = event['attributes']
future = publisher.publish(OUT_TOPIC, response.json.encode('utf-8'), **attrs)
print('out_message_id:' + future.result())
return
У нас точно такая же проблема в следующей функции.