Как отслеживать журналы Stackdriver для конкретной таблицы c в Bigquery в режиме реального времени, используя python? - PullRequest
0 голосов
/ 30 января 2020

Я использую инструмент BI для просмотра данных в Bigquery. Данные распределены по нескольким таблицам, поэтому у меня есть несколько JOINS в запросе, чтобы получить данные в нужном мне формате. Поскольку обработка всех этих данных заняла минуту, и я хотел бы получить опыт в реальном времени, я создал запланированный запрос для запуска запроса, который объединяет и сохраняет выходные данные в таблицу. Данные в таблицах подаются из инструмента ETL каждые 30 минут (инкрементная нагрузка). Это создает Bigquery Job для загрузки данных.

У меня есть одна конкретная таблица, после выполнения задания для этой таблицы я хочу выполнить запланированный запрос.

Я отключил расписание в запланированном запросе и сделал его таким, чтобы он мог работать только во время вызова API. Я написал сценарий python, который отправляет запрос API на запланированный запрос.

Есть ли способ в python, который отслеживает журналы в режиме реального времени для конкретной таблицы Bigquery, поэтому, когда статус задания изменяется на ' Успешно 'для конкретной таблицы, я отправлю запрос API на запланированный запрос для выполнения Запланированного запроса?

Я видел Ведение журнала Stackdriver python код и похоже, что мне нужно неоднократно делать запросы API для имитации мониторинга в реальном времени. Кажется, я не могу отфильтровать результаты в запросах API для конкретной таблицы, я написал несколько сценариев, чтобы сделать это для меня из результатов журнала.

Есть ли библиотека, которая делает это изначально?

Ответы [ 2 ]

0 голосов
/ 04 февраля 2020

Нашел решение, но использует другие сервисы Google.

Cloud Logging имеет функцию под названием Sink, где мы можем направлять журналы в Cloud Pub/Sub topi c.

Мы можем вызвать Cloud Functions из Cloud Pub/Sub.

Cloud Functions с кодом python для отправки запроса API на Scheduled query.

0 голосов
/ 30 января 2020

Если фильтр, который вы упомянули правильно, должен работать:

from google.cloud import logging
from google.cloud.logging import DESCENDING

filter = 'resource.type="bigquery_resource" AND protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId="tableID" AND log_name="projects/projectID/logs/cloudaudit.googleapis.com%2Fdata_access" AND proto_payload.method_name="jobservice.jobcompleted'



for element in logging_client.list_entries(order_by=DESCENDING, filter_=filter): 
     .... YOUR LOGIC HERE ...

Если вам нужно решение в реальном времени (некоторый код, ожидающий поступления журналов), вы должны реализовать его самостоятельно. Код выше принесет все журналы, связанные с вашим фильтром, упорядоченные по убыванию даты. Если вы хотите перечислить только последние журналы, вы также должны изменить свой фильтр, добавив фильтр отметок времени.

Если у вас есть какие-либо вопросы, не стесняйтесь спрашивать меня. Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...