Google Dataflow: вставка + обновление в BigQuery в потоковом конвейере - PullRequest
0 голосов
/ 29 ноября 2018

Основной объект

Потоковый конвейер Python, в котором я читаю входные данные из pub / sub.

После анализа входных данных доступны две опции:

  • Если x = 1 -> вставить
  • Если x = 2 -> обновить

Проверка

  • Этого нельзя сделать с помощью функции луча apache, поэтому его необходимо разработать с использованием API BigQuery 0.25 (в настоящее время это версия, поддерживаемая в Google Dataflow)

Проблема

  • Вставленная запись все еще находится в буфере BigQuery, поэтому оператор обновления завершается ошибкой:

         UPDATE or DELETE statement over table table would affect rows in the streaming buffer, which is not supported
    

Код

Вставка

def insertCanonicalBQ(input):
    from google.cloud import bigquery
    client = bigquery.Client(project='project')
    dataset = client.dataset('dataset')
    table = dataset.table('table' )
    table.reload()
    table.insert_data(
        rows=[[values]])

Обновление

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD= "#standardSQL"
    QUERY= STD + "\n" + """UPDATE table SET field1 = 'XXX' WHERE field2=  'YYY'"""
    client.use_legacy_sql = False    
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    while True:
         query_job.reload()  # Refreshes the state via a GET request.
         if query_job.state == 'DONE':
             if query_job.error_result:
                 raise RuntimeError(query_job.errors)
             print "done"
             return input
             time.sleep(1)

1 Ответ

0 голосов
/ 01 декабря 2018

Даже если бы строки не было в буфере потоковой передачи, это не было бы способом решения этой проблемы в BigQuery.Хранилище BigQuery лучше подходит для массовых мутаций, чем для мутаций отдельных объектов, таких как UPDATE.Ваш шаблон соответствует тому, что я ожидал от транзакционного, а не аналитического варианта использования.

Для этого рассмотрим шаблон на основе дополнений.Каждый раз, когда вы обрабатываете сообщение объекта, пишите его в BigQuery с помощью потоковой вставки.Затем, при необходимости, вы можете получить последнюю версию всех сущностей с помощью запроса.

В качестве примера давайте предположим произвольную схему: idfield - это ваш уникальный ключ / идентификатор объекта, а message_time представляет время, когда было отправлено сообщение.У ваших сущностей может быть много других полей.Чтобы получить последнюю версию сущностей, мы могли бы выполнить следующий запрос (и, возможно, записать его в другую таблицу):

#standardSQL
SELECT
  idfield,
  ARRAY_AGG(
    t ORDER BY message_time DESC LIMIT 1
  )[OFFSET(0)].* EXCEPT (idfield)
FROM `myproject.mydata.mytable` AS t
GROUP BY idfield

Дополнительным преимуществом этого подхода является то, что он также позволяет выполнять анализ впроизвольные моменты времени.Чтобы выполнить анализ сущностей по состоянию на час назад, просто добавьте предложение WHERE: WHERE message_time <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)

...