Я настроил инструмент ETL, который отправляет данные из Oracle в Bigquery и непрерывно отправляет инкрементные данные. Когда данные поступают, они создают задание и регистрируются в журнале Stackdriver.
Когда поступают новые инкрементные данные, некоторые из этих старых данных становятся ненужными, и я удаляю их с помощью запроса.
Я настроил приемник журналов с условием, чтобы при поступлении новых данных отправлялся в виде сообщения в облачный паб / суб и использовал облачный паб / суб для запуска облачных функций. Облачные функции выполняют API-вызов к Запланированному запросу, который выполняет запрос для удаления старых данных.
У меня есть около 20 таблиц (таблица необработанных данных) с той же настройкой для удаления старых данных из соответствующей таблицы.
У меня есть 5 таблиц (таблица Business Intelligence), в которых используются 20 таблиц с некоторыми объединениями и условиями.
Этот запрос также выполняется в запланированном запросе в режиме на основе времени.
Я хочу запустить запланированный запрос для таблицы Business Intelligence вручную, когда два или более запланированных запроса таблицы необработанных данных были успешно выполнены.
Я вижу журнал для каждого Запланированного запроса, используя следующий расширенный фильтр в журнале Stackdriver:
resource.type="bigquery_resource" AND
protoPayload.requestMetadata.callerSuppliedUserAgent="BigQuery Data Transfer Service" AND
protoPayload.serviceData.jobInsertRequest.resource.jobConfiguration.query.destinationTable.tableId="aes_mapbin" AND
protoPayload.serviceData.jobInsertResponse.resource.jobStatus.state="DONE"
То, что я хочу сделать, было: Создать приемник журнала для двух или более Запланированных запросов для вышеуказанного условия фильтра. Если два Запланированных запроса имеют JobStatus как «ВЫПОЛНЕНО», то отправьте сообщение в Cloud pub / подпрограмма, которая будет запускать облачные функции, которые вызывают API-вызов к запланированному запросу, который создает Business In Таблица рассказов.
Возможно ли это сделать? или я создаю сценарий logi c в облачных функциях python для сохранения состояния в корзине Cloud или в самом Bigquery, а затем при появлении сообщения снова сбрасываю значения, хранящиеся в?
My current python скрипт для выполнения API-вызова к запланированному запросу:
import base64
from google.cloud import bigquery_datatransfer
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.json_format import MessageToJson
client = bigquery_datatransfer.DataTransferServiceClient()
project = 'projectID'
parent = client.project_path(project)
start_time = Timestamp()
start_time.GetCurrentTime()
def hello_pubsub(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
response = client.start_manual_transfer_runs(
'Transfer configuration resource name',
requested_run_time = start_time
)