Есть ли способ выполнить неблокирующее load_job из клиентской библиотеки BigQuery Python? - PullRequest
1 голос
/ 19 октября 2019

У меня есть Flask API, который использует Flask_restful, Flask_CORS и Marshmallow. API выполняет некоторую работу по загрузке файла * .csv в облачное хранилище (с использованием подписанногоURL), подтверждает его загрузку, а затем создает и выполняет задание загрузки для передачи CSV из хранилища в BigQuery. Часть API, которая усугубляет мою потерю волос - это вызов для выполнения задания загрузки в GCP, которое загружает CSV-файл в BigQuery. Ниже приведен фрагмент кода:

...
            dataset_ref = bq_client.dataset(target_dataset) 
            job_config.schema =  bq_schema 
            job_config.source_format = SOURCE_FORMAT 
            job_config.field_delimiter =  DELIM  
            job_config.destination_table_description = TARGET_TABLE
            job_config.encoding = ENCODING 
            job_config.max_bad_records = MAX_BAD_RECORDS
            job_config.autodetect = False # Do not autodetect schema
            load_job = bq_client.load_table_from_uri(
                uri, dataset_ref.table(target_table), job_config=job_config
            )  # API request
            load_job.result() # **<-- This is the concern**
            return {"message": "Successfully uploaded to Bigquery"}, 200

Передача файла может занять некоторое время, и меня беспокоит то, что в периоды, когда существует некоторая задержка, веб-сервер отключается по времени, ожидая передачиместо. Я бы предпочел выполнить load_job.result(), получить идентификатор задания и вернуть ответ 201. Затем я могу использовать идентификатор задания для опроса GCP, чтобы определить, был ли он успешным или нет, вместо того, чтобы существовал риск тайм-аута запроса для клиентского интерфейса и оставил пользователя в замешательстве относительно того, был ли он успешным илинет.

Я понимаю, что load_job.result () является асинхронным, но с Flask это не помогает. Я собирался перейти на Quart, чтобы использовать async / await, но другие мои зависимости не поддерживаются, и поэтому у меня будет много работы по рефакторингу. Есть ли другой способ, которым кто-то использовал для решения этого типа проблемы? Приветствия

1 Ответ

1 голос
/ 21 октября 2019

Кварта ничего не решает. Действительно, Quart все еще нуждается в работающей среде, он ожидает и контролирует функцию блокировки и в конце вызывает ваш обратный вызов. Ваша функция должна все еще работать для выполнения этого.

Существует лучший дизайн для этого. Я рекомендую вам взглянуть на Cloud Task . Процесс выглядит следующим образом:

  • Запустите задание загрузки
  • Создайте задание с идентификатором задания загрузки в параметре
  • Выйдите из функции
  • Задача вызовет другую функцию, которая будет проверять, закончено ли задание
    • Если еще не закончено, вернуть код ошибки (отличный от 2XX).
    • Если закончите, верните код возврата OK (2XX)

Вы должны настроить облачную задачу с политикой повтора дляне повторять попытку сразу (например, установить min-backoff в 30 с)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...