Как запустить 2 облачных функции асинхронно с использованием Python? - PullRequest
0 голосов
/ 04 ноября 2019

У меня есть 3 облачных функции GCP, написанных на python, а именно CF1, CF2, CF3. CF1 проверит определенные условия и соответственно должен выполнить параллельное выполнение CF2 и CF3.

Я пробовал

if condition is true:
    requests.get("url of CF2")
    print("CF2 executed successfully")
    requests.get("url of CF3")
    print("CF3 executed successfully")

Код CF1:

import requests

static_query = "select * from `myproject.mydataset.mytable`"
    try:

        # Executing query and loading data into temporary table.
        client = bigquery.Client()
        job_config = bigquery.QueryJobConfig()
        dest_dataset = client.dataset(temporary_dataset, temporary_project)
        dest_table = dest_dataset.table(temporary_table)
        job_config.destination = dest_table
        job_config.create_disposition = 'CREATE_IF_NEEDED'
        job_config.write_disposition = 'WRITE_TRUNCATE'
        query_job = client.query(static_query, location=bq_location, job_config=job_config)
        query_job.result()
        table = client.get_table(dest_table)
        expiration = (datetime.now() + timedelta(minutes=expiration_time))
        table.expires = expiration
        table = client.update_table(table, ["expires"])
        logging.info("Query result loaded into temporary table: {}".format(temporary_table))

        # Check row count of resultant query from temporary table.
        count_query = "select count(*) size from `{}.{}.{}`".format(temporary_project, temporary_dataset,
                                                                    temporary_table)
        job = client.query(count_query)
        results = job.result()
        count = 0
        for row in results:
            count = row.size

        # If row count of query result is empty log error message on stack-driver.
        if count == 0:
            logging.error("Query executed with empty result set.")

        # If row count of query result has records then trigger below two cloud functions (this should be parallel execution).
        else:
            # Trigger CF2 cloud function.
            requests.get("{}".format(cf2_endpoint))
            logging.info("CF2 executed successfully.")

            # Trigger CF3 cloud function.
            requests.get("{}".format(cf3_endpoint))
            logging.info("CF3 executed successfully.")
    except RuntimeError:
        logging.error("Exception occurred {}".format(error_log_client.report_exception()))

Здесь я хочу выполнить CF2 и CF3 асинхронно. За любые предложения и решения Спасибо заранее.

Ответы [ 2 ]

0 голосов
/ 04 ноября 2019

Если вам нужно выполнять асинхронные запросы и вы используете Python, вы можете попробовать использовать библиотеки aiohttp или asyncio , здесь есть один пример здесь . Кроме того, вы можете проверить Cloud Pub / Sub или Cloud Cloud Tasks .

0 голосов
/ 04 ноября 2019

Лучший сервис для выполнения асинхронного вызова - это PubSub. Для этого вам необходимо:

  • Создать тему PubSub
  • Развернуть CF2 с триггером на событие PubSub в ранее созданной теме
  • Развернуть CF3 с помощьюзапуск по событию PubSub по ранее созданной теме
  • Функция CF1 создает сообщение PubSub с параметрами или без них и публикует их
  • Функции CF2 и CF3 запускаются параллельно с сообщением, опубликованным в PubSub. Они извлекают из него параметры и выполняют процесс

Если CF2 и CF3 уже существуют и запускаются HTTP-вызовом, вы можете настроить принудительную HTTP-подписку на тему PubSub.

Если происходит сбой CF2 или CF3, сообщение повторно отправляется функции до действительного подтверждения (2XX HTTP-ответа) или сообщения TTL (7 дней по умолчанию).

Кстати, вы не связаны, вымасштабируемы, вы параллельны, и CF по ошибке повторяется.

...