Как настроить рабочие вакансии в Google BigQuery? - PullRequest
1 голос
/ 31 января 2020

У меня есть несколько заданий, например, одна загружает текстовый файл из хранилища облачного хранилища Google в таблицу bigquery, а другая - запланированный запрос для копирования данных из одной таблицы в другую с некоторым преобразованием, я хочу второй работа зависит от успеха первого, как мы можем достичь этого в больших запросах, если это вообще возможно сделать?

Большое спасибо.

С уважением,

Ответы [ 3 ]

2 голосов
/ 31 января 2020

В настоящее время разработчик должен собрать цепочку операций. Это можно сделать с помощью облачных функций (поддерживает Node.js, Go, Python) или с помощью контейнера Cloud Run (поддерживает gcloud API, любой язык программирования).

В основном вам необходимо

  1. выдать задание
  2. получить идентификатор задания
  3. опрос для идентификатора задания
  4. задание завершено, вызвать другие шаги

При использовании облачных функций

  1. поместите файл в выделенную корзину GCS
  2. настройте GCF, который отслеживает эту корзину, и при загрузке нового файла он выполнит функцию импорта в GCS - дождитесь окончания операций
  3. в конце GCF, вы можете запускать другие функции для следующего шага

другой вариант использования с облачными функциями:

A: триггер запускает GCF
B: функция выполняет запрос (копирует данные в другую таблицу)
C: получает идентификатор задания - запускает другую функцию с небольшой задержкой

I : функция получает идентификатор задания
J: опросы на работу готовы?
K: если не готов, запускается снова с небольшой задержкой
L: если готовность запускает следующий шаг - это может быть выделенная функция или параметризованная функция

1 голос
/ 03 февраля 2020

С вашим сценарием можно обращаться либо с помощью облачных функций (CF), либо с помощью планировщика (воздушный поток). Первый подход основан на событиях, что приводит к немедленному получению данных. Планировщик ожидает задержки доступности данных.

Как уже указывалось, когда вы отправляете задание BigQuery, вы получаете обратно идентификатор задания, который необходимо проверить до его завершения. Затем, основываясь на статусе, вы можете обрабатывать сообщения об успешных или неудачных действиях соответственно.

Если вы разрабатываете CF, обратите внимание, что существуют определенные ограничения, такие как время выполнения (макс. 9 минут), к которым вам нужно будет обратиться в дело BigQuery занимает более 9 минут. Еще одна проблема с CF - это идемпотентность, обеспечивающая, что если одно и то же событие файла данных происходит более одного раза, обработка не должна приводить к дублированию данных.

В качестве альтернативы, вы можете рассмотреть возможность использования некоторых управляемых событиями проектов с открытым исходным кодом без сервера, таких как BqTail - Google Cloud Storage BigQuery Loader с преобразованием после загрузки.

Вот пример правила bqtail.

rule.yaml

When:
  Prefix: "/mypath/mysubpath"
  Suffix: ".json"
Async: true
Batch:
  Window:
    DurationInSec: 85
Dest:
  Table: bqtail.transactions
  Transient:
    Dataset: temp
    Alias: t
  Transform:
    charge: (CASE WHEN type_id = 1 THEN t.payment + f.value WHEN type_id = 2 THEN t.payment * (1 + f.value) END)
  SideInputs:
    - Table: bqtail.fees
      Alias: f
      'On': t.fee_id = f.id
OnSuccess:
  - Action: query
    Request:
      SQL: SELECT
        DATE(timestamp) AS date,
        sku_id,
        supply_entity_id,
        MAX($EventID) AS batch_id,
        SUM( payment) payment,
        SUM((CASE WHEN type_id = 1 THEN t.payment + f.value WHEN type_id = 2 THEN t.payment * (1 + f.value) END)) charge,
        SUM(COALESCE(qty, 1.0)) AS qty
        FROM $TempTable t
        LEFT JOIN bqtail.fees f ON f.id = t.fee_id
        GROUP BY 1, 2, 3
      Dest: bqtail.supply_performance
      Append: true
    OnFailure:
      - Action: notify
        Request:
          Channels:
            - "#e2e"
          Title: Failed to aggregate data to supply_performance
          Message: "$Error"
    OnSuccess:
      - Action: query
        Request:
          SQL: SELECT CURRENT_TIMESTAMP() AS timestamp, $EventID AS job_id
          Dest: bqtail.supply_performance_batches
          Append: true
      - Action: delete
1 голос
/ 31 января 2020

Вы хотите использовать инструмент оркестровки, особенно если вы хотите настроить эту задачу на задание повторяющихся заданий. Мы используем Google Cloud Composer, который является управляемой службой, основанной на Airflow , для согласования рабочих процессов и отлично работает. Он поставляется с автоматической повторной попыткой, мониторингом, оповещением и многим другим.

Возможно, вы захотите попробовать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...