Записывать результаты BigQuery в GCS в формате CSV, используя Apache Beam - PullRequest
0 голосов
/ 22 октября 2018

Я довольно новичок в работе над Apache Beam, где я пытаюсь написать конвейер для извлечения данных из Google BigQuery и записи данных в GCS в формате CSV с использованием Python.

Использование beam.io.read(beam.io.BigQuerySource())Я могу читать данные из BigQuery, но не знаю, как записать их в GCS в формате CSV.

Есть ли пользовательская функция для достижения того же, не могли бы вы мне помочь?

import logging

import apache_beam as beam


PROJECT='project_id'
BUCKET='project_bucket'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=readwritebq',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
]

with beam.Pipeline(argv=argv) as p:

    # Execute the SQL in big query and store the result data set into given Destination big query table.
    BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
        beam.io.BigQuerySource(query =  'Select * from `dataset.table`', use_standard_sql=True))
    # Extract data from Bigquery to GCS in CSV format.
    # This is where I need your help

    BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
            table='tablename',
            dataset='datasetname',
            project='project_id',
            schema='name:string,gender:string,count:integer',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)

if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   run()

1 Ответ

0 голосов
/ 22 октября 2018

Вы можете сделать это, используя WriteToText, чтобы добавить суффикс .csv и headers.Учтите, что вам нужно будет проанализировать результаты запроса в формате CSV.В качестве примера я использовал общедоступный шекспировский набор данных и следующий запрос:

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(слово)> 3 ORDER BY word_count DESC LIMIT 10

Теперь мы читаем результаты запроса следующим образом:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA теперь содержит пары ключ-значение:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}

Мы можем применить функцию beam.Map, чтобы получить только значения:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

Выдержка из BQ_VALUES:

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]

И, наконец, отобразить снова, чтобы всезначения столбца, разделенные запятыми вместо списка (учтите, что вам нужно будет избегать двойных кавычек, если они могут появляться в поле):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

Теперь мы запишем результаты в GCS с суффиксом иЗаголовки:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
    'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

Письменные результаты:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"
...