Чтение нескольких строк из Bigquery в качестве побочного ввода, получая None - PullRequest
0 голосов
/ 26 ноября 2018

У меня проблемы с побочными входами, особенно BQ, для конвейеров потока данных, даже после того, как я пошел на курсы и посмотрел примеры.

Прямо сейчас у меня есть конвейер, который читает файлы в корзине gcs, получает их имена файлов, а затем преобразует файл и записывает заданное количество строк в bigquery.Я пытаюсь выяснить, как сопоставить имя файла с конкретным «ключом» из bq.

result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)
#create each input PCollection name
variables = ['p{}'.format(i) for i in range(len(result))]

На основе результата я строю кортеж из всех имен файлов (filename1, filename2…), создаюдинамический запрос: Bqquery = "SELECT FILENAME, FILE_ID from 'project:dataset.table' where FILENAME IN (filename tuple)" Я думал, что сделаю это, так как это будет примерно 20 файлов одновременно, поэтому имеет смысл получить данные из bq один раз, вместо того, чтобы получить file_id внутри цикла for.

Итак, я сделал

Bqcollection = p | 'Get File_Id' >> beam.io.Read(beam.io.BigQuerySource(query=bqquery))
But the result I get is none 


for i in range(len(result)):
    current_file = result[i].path
    #query inside for loop 
    #bqquery= "SELECT FILE_ID" from 'project:dataset.table' where FILENAME = '{0}'".format(current_file)
   # file_id = p | 'GetFile_id_{0}'.format(i) >> beam.io.Read(beam.io.BigQuerySource(query=bqquery))
    globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), current_file)

Я также попытался выполнить запрос внутри цикла for, чтобы получить только одно имя файла за раз (см. Закомментированный код), но это не сработалоили.В конечном счете, я хочу изменить beam.Pardo(AddFilenamesFn(), current_file) на beam.Pardo(AddFileNamesFn(), file_id), поэтому вместо добавления фактического имени файла я добавляю fileid

[обратите внимание, что в коде упоминаются метки (т.е. read_labels [i])просто ярлыки для потока данных]

Полагаю, я упускаю что-то довольно простое в pcollections, но не уверен

1 Ответ

0 голосов
/ 30 ноября 2018

Я думаю, что самым простым решением с учетом кода в предыдущем вопросе было бы выполнение запросов внутри AddFilenamesFn ParDo внутри цикла for.Помните, что beam.io.Read(beam.io.BigQuerySource(query=bqquery)) используется для чтения строк как источника, а не на промежуточном этапе.Таким образом, в случае, который я предлагаю, вы можете напрямую использовать клиентскую библиотеку Python (google-cloud-bigquery>0.27.0):

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with file id (retrieved from BigQuery) and row"""
    def process(self, element, file_path):
        from google.cloud import bigquery

        client = bigquery.Client()
        file_name = file_path.split("/")[-1]

        query_job = client.query("""
            SELECT FILE_ID
            FROM test.file_mapping
            WHERE FILENAME = '{0}'
            LIMIT 1""".format(file_name))

        results = query_job.result()

        for row in results:
          file_id = row.FILE_ID

        yield {'filename':file_id, 'row':element}

Это было бы наиболее простым решением для реализации, но оно может вызвать проблему.Вместо того, чтобы запускать все ~ 20 возможных запросов в начале конвейера, мы выполняем запрос для каждой строки / записи.Например, если у нас есть 3000 элементов в одном файле, один и тот же запрос будет запущен 3000 раз.Однако каждый отдельный запрос должен выполняться только один раз, и последующий запрос «повторяется» попадет в кеш .Также обратите внимание, что кэшированные запросы не влияют на интерактивный запрос limit .

Я использовал те же файлы моего предыдущего ответа :

$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france

и добавил новую таблицу:

bq mk test.file_mapping FILENAME:STRING,FILE_ID:STRING
bq query --use_legacy_sql=false 'INSERT INTO test.file_mapping (FILENAME, FILE_ID) values ("countries1.csv", "COUNTRIES ONE"), ("countries2.csv", "COUNTRIES TWO")'

enter image description here

, и получится:

INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'id,country'}
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'1,sweden'}
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'2,spain'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'id,country'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'3,italy'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'4,france'}

Другое решение будетзагрузите всю таблицу и материализуйте ее как побочный ввод (в зависимости от размера это, конечно, может быть проблематично) с помощью beam.io.BigQuerySource() или, как вы говорите, разбейте его на N запросов и сохраните каждый на другой боковой ввод.Затем вы можете выбрать подходящий для каждой записи и передать его в качестве дополнительного ввода в AddFilenamesFn.Было бы интересно попробовать написать и это.

Полный код моего первого предложенного решения:

import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with file id (retrieved from BigQuery) and row"""
    def process(self, element, file_path):
        from google.cloud import bigquery

        client = bigquery.Client()

        file_name = file_path.split("/")[-1]

        query_job = client.query("""
            SELECT FILE_ID
            FROM test.file_mapping
            WHERE FILENAME = '{0}'
            LIMIT 1""".format(file_name))

        results = query_job.result()

        for row in results:
          file_id = row.FILE_ID

        yield {'filename':file_id, 'row':element}

# just logging output to visualize results
def write_res(element):
  logging.info(element)
  return element

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(options=PipelineOptions(pipeline_args))
  gcs = GCSFileSystem(PipelineOptions(pipeline_args))
  gcs_reader = GCSFileReader(gcs)

  # in my case I am looking for files that start with 'countries'
  BUCKET='BUCKET_NAME'
  result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
  result = reduce(add, result)

  # create each input PCollection name and unique step labels
  variables = ['p{}'.format(i) for i in range(len(result))]
  read_labels = ['Read file {}'.format(i) for i in range(len(result))]
  add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

  # load each input file into a separate PCollection and add filename to each row
  for i in range(len(result)):
    globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

  # flatten all PCollections into a single one
  merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

  p.run()

if __name__ == '__main__':
  run()
...