У меня есть облачная функция Google, запускаемая PubSub. Сообщения о состоянии do c подтверждаются после успешного завершения функции. ссылка
Но случайным образом функция повторяется (тот же идентификатор выполнения) ровно через 10 минут после выполнения. Это максимальное время ожидания PubSub.
Я также пытался получить идентификатор сообщения и подтвердить его программным способом в коде функции, но API PubSub отвечает, что сообщения с таким идентификатором нет. В мониторинге StackDriver я вижу, что некоторые сообщения не подтверждаются.
Вот мой код: main.py
import base64
import logging
import traceback
from google.api_core import exceptions
from google.cloud import bigquery, error_reporting, firestore, pubsub
from sql_runner.runner import orchestrator
logging.getLogger().setLevel(logging.INFO)
def main(event, context):
bigquery_client = bigquery.Client()
firestore_client = firestore.Client()
publisher_client = pubsub.PublisherClient()
subscriber_client = pubsub.SubscriberClient()
logging.info(
'event=%s',
event
)
logging.info(
'context=%s',
context
)
try:
query_id = base64.b64decode(event.get('data',b'')).decode('utf-8')
logging.info(
'query_id=%s',
query_id
)
# inject dependencies
orchestrator(
query_id,
bigquery_client,
firestore_client,
publisher_client
)
sub_path = (context.resource['name']
.replace('topics', 'subscriptions')
.replace('function-sql-runner', 'gcf-sql-runner-europe-west1-function-sql-runner')
)
# explicitly ack message to avoid duplicates invocations
try:
subscriber_client.acknowledge(
sub_path,
[context.event_id] # message_id to ack
)
logging.warning(
'message_id %s acknowledged (FORCED)',
context.event_id
)
except exceptions.InvalidArgument as err:
# google.api_core.exceptions.InvalidArgument: 400 You have passed an invalid ack ID to the service (ack_id=982967258971474).
logging.info(
'message_id %s already acknowledged',
context.event_id
)
logging.debug(err)
except Exception as err:
# catch all exceptions and log to prevent cold boot
# report with error_reporting
error_reporting.Client().report_exception()
logging.critical(
'Internal error : %s -> %s',
str(err),
traceback.format_exc()
)
if __name__ == '__main__': # for testing
from collections import namedtuple # use namedtuple to avoid Class creation
Context = namedtuple('Context', 'event_id resource')
context = Context('666', {'name': 'projects/my-dev/topics/function-sql-runner'})
script_to_start = b' ' # launch the 1st script
script_to_start = b'060-cartes.sql'
main(
event={"data": base64.b64encode(script_to_start)},
context=context
)
Вот мой код: runner.py
import logging
import os
from retry import retry
PROJECT_ID = os.getenv('GCLOUD_PROJECT') or 'my-dev'
def orchestrator(query_id, bigquery_client, firestore_client, publisher_client):
"""
if query_id empty, start the first sql script
else, call the given query_id.
Anyway, call the next script.
If the sql script is the last, no call
retrieve SQL queries from FireStore
run queries on BigQuery
"""
docs_refs = [
doc_ref.get() for doc_ref in
firestore_client.collection(u'sql_scripts').list_documents()
]
sorted_queries = sorted(docs_refs, key=lambda x: x.id)
if not bool(query_id.strip()) : # first execution
current_index = 0
else:
# find the query to run
query_ids = [ query_doc.id for query_doc in sorted_queries]
current_index = query_ids.index(query_id)
query_doc = sorted_queries[current_index]
bigquery_client.query(
query_doc.to_dict()['request'], # sql query
).result()
logging.info(
'Query %s executed',
query_doc.id
)
# exit if the current query is the last
if len(sorted_queries) == current_index + 1:
logging.info('All scripts were executed.')
return
next_query_id = sorted_queries[current_index+1].id.encode('utf-8')
publish(publisher_client, next_query_id)
@retry(tries=5)
def publish(publisher_client, next_query_id):
"""
send a message in pubsub to call the next query
this mechanism allow to run one sql script per Function instance
so as to not exceed the 9min deadline limit
"""
logging.info('Calling next query %s', next_query_id)
future = publisher_client.publish(
topic='projects/{}/topics/function-sql-runner'.format(PROJECT_ID),
data=next_query_id
)
# ensure publish is successfull
message_id = future.result()
logging.info('Published message_id = %s', message_id)
Похоже, что сообщение pubsub не является успешным. Я не думаю, что у меня есть фоновая активность в моем коде.
Мой вопрос: почему моя функция случайным образом повторяет попытку даже в случае успеха?