Облачные функции Google случайным образом повторяют попытку - PullRequest
1 голос
/ 02 марта 2020

У меня есть облачная функция Google, запускаемая PubSub. Сообщения о состоянии do c подтверждаются после успешного завершения функции. ссылка

Но случайным образом функция повторяется (тот же идентификатор выполнения) ровно через 10 минут после выполнения. Это максимальное время ожидания PubSub.

stackdriver logging

Я также пытался получить идентификатор сообщения и подтвердить его программным способом в коде функции, но API PubSub отвечает, что сообщения с таким идентификатором нет. В мониторинге StackDriver я вижу, что некоторые сообщения не подтверждаются.

stackdriver monitoring

Вот мой код: main.py

import base64
import logging
import traceback

from google.api_core import exceptions
from google.cloud import bigquery, error_reporting, firestore, pubsub

from sql_runner.runner import orchestrator

logging.getLogger().setLevel(logging.INFO)


def main(event, context):

    bigquery_client = bigquery.Client()
    firestore_client = firestore.Client()
    publisher_client = pubsub.PublisherClient()
    subscriber_client = pubsub.SubscriberClient()

    logging.info(
        'event=%s',
        event
    )
    logging.info(
        'context=%s',
        context
    )
    try:
        query_id = base64.b64decode(event.get('data',b'')).decode('utf-8')
        logging.info(
            'query_id=%s',
            query_id
        )

        # inject dependencies
        orchestrator(
            query_id,
            bigquery_client,
            firestore_client,
            publisher_client
        )

        sub_path = (context.resource['name']
            .replace('topics', 'subscriptions')
            .replace('function-sql-runner', 'gcf-sql-runner-europe-west1-function-sql-runner')
        )

        # explicitly ack message to avoid duplicates invocations
        try:
            subscriber_client.acknowledge(
                sub_path,
                [context.event_id]  # message_id to ack
            )
            logging.warning(
                'message_id %s acknowledged (FORCED)',
                context.event_id
            )
        except exceptions.InvalidArgument as err:
            # google.api_core.exceptions.InvalidArgument: 400 You have passed an invalid ack ID to the service (ack_id=982967258971474).
            logging.info(
                'message_id %s already acknowledged',
                context.event_id
            )
            logging.debug(err)

    except Exception as err:
        # catch all exceptions and log to prevent cold boot
        # report with error_reporting
        error_reporting.Client().report_exception()
        logging.critical(
            'Internal error : %s -> %s',
            str(err),
            traceback.format_exc()
        )


if __name__ == '__main__':  # for testing
    from collections import namedtuple  # use namedtuple to avoid Class creation
    Context = namedtuple('Context', 'event_id resource')
    context = Context('666', {'name': 'projects/my-dev/topics/function-sql-runner'})

    script_to_start = b' '   # launch the 1st script
    script_to_start = b'060-cartes.sql'

    main(
        event={"data": base64.b64encode(script_to_start)},
        context=context
    )

Вот мой код: runner.py

import logging
import os

from retry import retry


PROJECT_ID = os.getenv('GCLOUD_PROJECT') or 'my-dev'


def orchestrator(query_id, bigquery_client, firestore_client, publisher_client):
    """ 
    if query_id empty, start the first sql script
    else, call the given query_id. 

    Anyway, call the next script.
    If the sql script is the last, no call

    retrieve SQL queries from FireStore
    run queries on BigQuery
    """
    docs_refs = [ 
        doc_ref.get() for doc_ref in 
        firestore_client.collection(u'sql_scripts').list_documents()
    ]

    sorted_queries = sorted(docs_refs, key=lambda x: x.id)

    if not bool(query_id.strip()) :  # first execution 
        current_index = 0
    else:
        # find the query to run
        query_ids = [ query_doc.id for query_doc in sorted_queries]
        current_index = query_ids.index(query_id)

    query_doc = sorted_queries[current_index]

    bigquery_client.query(
        query_doc.to_dict()['request'],  # sql query
    ).result()

    logging.info(
        'Query %s executed',
        query_doc.id
    )

    # exit if the current query is the last
    if len(sorted_queries) == current_index + 1:
        logging.info('All scripts were executed.')
        return

    next_query_id = sorted_queries[current_index+1].id.encode('utf-8')

    publish(publisher_client, next_query_id)

@retry(tries=5)
def publish(publisher_client, next_query_id):
    """
    send a message in pubsub to call the next query
    this mechanism allow to run one sql script per Function instance
    so as to not exceed the 9min deadline limit
    """
    logging.info('Calling next query %s', next_query_id)

    future = publisher_client.publish(
        topic='projects/{}/topics/function-sql-runner'.format(PROJECT_ID),
        data=next_query_id
    )

    # ensure publish is successfull
    message_id = future.result()
    logging.info('Published message_id = %s', message_id)

Похоже, что сообщение pubsub не является успешным. Я не думаю, что у меня есть фоновая активность в моем коде.

Мой вопрос: почему моя функция случайным образом повторяет попытку даже в случае успеха?

1 Ответ

3 голосов
/ 02 марта 2020

Облачные функции не гарантируют, что ваши функции будут работать ровно один раз. Согласно документации фоновые функции, включая функции pubsub, имеют гарантию не менее одного раза:

Фоновые функции вызываются как минимум один раз, Это связано с асинхронной природой обработки событий, при которой нет вызывающей стороны, ожидающей ответа. В редких случаях система может вызывать фоновую функцию более одного раза, чтобы обеспечить доставку события. Если вызов фоновой функции завершается неудачно с ошибкой, он не будет вызываться снова, если не включены повторные попытки сбоя для этой функции.

Ваш код должен ожидать, что он может получить событие больше, чем один раз. Таким образом, ваш код должен быть идемпотентным:

Чтобы убедиться, что ваша функция ведет себя правильно при повторных попытках выполнения, вы должны сделать ее идемпотентной, реализовав ее так, чтобы событие приводило к желаемым результатам (и побочные эффекты), даже если он доставлен несколько раз. В случае функций HTTP это также означает возвращение желаемого значения, даже если вызывающий абонент повторяет вызовы к конечной точке функции HTTP. См. Повтор фоновых функций для получения дополнительной информации о том, как сделать вашу функцию идемпотентной.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...