С вашим сценарием можно обращаться либо с помощью облачных функций (CF), либо с помощью планировщика (воздушный поток). Первый подход основан на событиях, что приводит к немедленному получению данных. Планировщик ожидает задержки доступности данных.
Как уже указывалось, когда вы отправляете задание BigQuery, вы получаете обратно идентификатор задания, который необходимо проверить до его завершения. Затем, основываясь на статусе, вы можете обрабатывать сообщения об успешных или неудачных действиях соответственно.
Если вы разрабатываете CF, обратите внимание, что существуют определенные ограничения, такие как время выполнения (макс. 9 минут), к которым вам нужно будет обратиться в дело BigQuery занимает более 9 минут. Еще одна проблема с CF - это идемпотентность, обеспечивающая, что если одно и то же событие файла данных происходит более одного раза, обработка не должна приводить к дублированию данных.
В качестве альтернативы, вы можете рассмотреть возможность использования некоторых управляемых событиями проектов с открытым исходным кодом без сервера, таких как BqTail - Google Cloud Storage BigQuery Loader с преобразованием после загрузки.
Вот пример правила bqtail.
rule.yaml
When:
Prefix: "/mypath/mysubpath"
Suffix: ".json"
Async: true
Batch:
Window:
DurationInSec: 85
Dest:
Table: bqtail.transactions
Transient:
Dataset: temp
Alias: t
Transform:
charge: (CASE WHEN type_id = 1 THEN t.payment + f.value WHEN type_id = 2 THEN t.payment * (1 + f.value) END)
SideInputs:
- Table: bqtail.fees
Alias: f
'On': t.fee_id = f.id
OnSuccess:
- Action: query
Request:
SQL: SELECT
DATE(timestamp) AS date,
sku_id,
supply_entity_id,
MAX($EventID) AS batch_id,
SUM( payment) payment,
SUM((CASE WHEN type_id = 1 THEN t.payment + f.value WHEN type_id = 2 THEN t.payment * (1 + f.value) END)) charge,
SUM(COALESCE(qty, 1.0)) AS qty
FROM $TempTable t
LEFT JOIN bqtail.fees f ON f.id = t.fee_id
GROUP BY 1, 2, 3
Dest: bqtail.supply_performance
Append: true
OnFailure:
- Action: notify
Request:
Channels:
- "#e2e"
Title: Failed to aggregate data to supply_performance
Message: "$Error"
OnSuccess:
- Action: query
Request:
SQL: SELECT CURRENT_TIMESTAMP() AS timestamp, $EventID AS job_id
Dest: bqtail.supply_performance_batches
Append: true
- Action: delete