Как отправить запрос сообщения publi sh в Cloud pub / sub после того, как два разных журнала имеют статус DONE в jobStatus? - PullRequest
0 голосов
/ 27 февраля 2020

Я настроил инструмент ETL, который отправляет данные из Oracle в Bigquery и непрерывно отправляет инкрементные данные. Когда данные поступают, они создают задание и регистрируются в журнале Stackdriver.

Когда поступают новые инкрементные данные, некоторые из этих старых данных становятся ненужными, и я удаляю их с помощью запроса.

Я настроил приемник журналов с условием, чтобы при поступлении новых данных отправлялся в виде сообщения в облачный паб / суб и использовал облачный паб / суб для запуска облачных функций. Облачные функции выполняют API-вызов к Запланированному запросу, который выполняет запрос для удаления старых данных.

У меня есть около 20 таблиц (таблица необработанных данных) с той же настройкой для удаления старых данных из соответствующей таблицы.

У меня есть 5 таблиц (таблица Business Intelligence), в которых используются 20 таблиц с некоторыми объединениями и условиями.

Этот запрос также выполняется в запланированном запросе в режиме на основе времени.

Я хочу запустить запланированный запрос для таблицы Business Intelligence вручную, когда два или более запланированных запроса таблицы необработанных данных были успешно выполнены.

Я вижу журнал для каждого Запланированного запроса, используя следующий расширенный фильтр в журнале Stackdriver:

resource.type="bigquery_resource" AND
protoPayload.requestMetadata.callerSuppliedUserAgent="BigQuery Data Transfer Service" AND
protoPayload.serviceData.jobInsertRequest.resource.jobConfiguration.query.destinationTable.tableId="aes_mapbin" AND
protoPayload.serviceData.jobInsertResponse.resource.jobStatus.state="DONE"

То, что я хочу сделать, было: Создать приемник журнала для двух или более Запланированных запросов для вышеуказанного условия фильтра. Если два Запланированных запроса имеют JobStatus как «ВЫПОЛНЕНО», то отправьте сообщение в Cloud pub / подпрограмма, которая будет запускать облачные функции, которые вызывают API-вызов к запланированному запросу, который создает Business In Таблица рассказов.

Возможно ли это сделать? или я создаю сценарий logi c в облачных функциях python для сохранения состояния в корзине Cloud или в самом Bigquery, а затем при появлении сообщения снова сбрасываю значения, хранящиеся в?

My current python скрипт для выполнения API-вызова к запланированному запросу:

import base64
from google.cloud import bigquery_datatransfer
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.json_format import MessageToJson

client = bigquery_datatransfer.DataTransferServiceClient()
project = 'projectID' 
parent = client.project_path(project)
start_time = Timestamp()   
start_time.GetCurrentTime()

def hello_pubsub(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    response = client.start_manual_transfer_runs(
        'Transfer configuration resource name',
        requested_run_time = start_time
        )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...