Google Dataflow: выполнение динамического запроса с BigQuery + Pub / Sub на Python - PullRequest
0 голосов
/ 20 ноября 2018

Что я хотел бы сделать в конвейере:

  1. Чтение из pub / sub (выполнено)
  2. Преобразовать эти данные в словарь (выполнено)
  3. Взять значение указанного ключа из dict (готово)
  4. Запустить параметризованный / динамический запрос из BigQuery, в котором часть where должна быть такой:

    SELECT field1 FROM Table where field2 = @valueFromP/S
    

Конвейер

| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription='')
| 'String to dictionary' >> beam.Map(lambda s:data_ingestion.parse_method(s))
| 'BigQuery' >> <Here is where I'm not sure how to do it>

Обычный способ чтения из BQ будет выглядеть так:

| 'Read' >> beam.io.Read(beam.io.BigQuerySource(
                query="SELECT field1 FROM table where field2='string'", use_standard_sql=True))

Я читал о параметризованных запросах , но я не уверен, что это будет работать с Apache Beam.

Это можно сделать с помощью боковых входов?

Какой самый лучший способ сделать это?


Что я пробовал:

def parse_methodBQ(input):
    query=''SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])'
    return query


class ReadFromBigQuery(beam.PTransform):
    def expand(self, pcoll):
        return (
                pcoll
                | 'FormatQuery' >> beam.Map(parse_methodBQ)
                | 'Read' >> beam.Map(lambda s:  beam.io.Read(beam.io.BigQuerySource(query=s)))
        )

with beam.Pipeline(options=pipeline_options) as p:
transform = (p  | 'BQ' >> ReadFromBigQuery()

Результат (зачем это?):

<Read(PTransform) label=[Read]>

Правильный результат должен выглядеть следующим образом:

{u'Field1': u'string', u'Field2': Bool}

РЕШЕНИЕ

В конвейере:

| 'BQ' >> beam.Map(parse_method_BQ))

Функция(используя API BigQuery 0.25 для потока данных)

def parse_method_BQ(input):
    client = bigquery.Client()
    QUERY = 'SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])
    client.use_legacy_sql = False
    query_job = client.run_async_query(query=QUERY ,job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    while True:
        query_job.reload()  # Refreshes the state via a GET request.
        if query_job.state == 'DONE':
            if query_job.error_result:
                raise RuntimeError(query_job.errors)
            rows = query_job.results().fetch_data()
            for row in rows:
                if not (row[0] is None):  
                    return input
        time.sleep(1)

1 Ответ

0 голосов
/ 22 ноября 2018

Вы можете прочитать всю таблицу или использовать строковый запрос .

Я понимаю, что вы будете использовать метод parse_methodBQ для настройки запроса по мере необходимости.Поскольку этот метод возвращает запрос, вы можете вызвать его с помощью BigQuerySource.Строки находятся в словаре.

| 'QueryTable' >> beam.Map(beam.io.BigQuerySource(parse_methodBQ))
# Each row is a dictionary where the keys are the BigQuery columns
| 'Read' >> beam.Map(lambda s:  s['data'])

Более того, вы можете избежать необходимости настраивать запрос и использовать метод фильтрации

Что касается боковых входов, просмотрите это пример из поваренной книги, чтобы лучше понять, как их использовать.

...