Вы можете сделать это, используя WriteToText
, чтобы добавить суффикс .csv
и headers
.Учтите, что вам нужно будет проанализировать результаты запроса в формате CSV.В качестве примера я использовал общедоступный шекспировский набор данных и следующий запрос:
SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(слово)> 3 ORDER BY word_count DESC LIMIT 10
Теперь мы читаем результаты запроса следующим образом:
BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))
BQ_DATA
теперь содержит пары ключ-значение:
{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}
Мы можем применить функцию beam.Map
, чтобы получить только значения:
BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())
Выдержка из BQ_VALUES
:
[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]
И, наконец, отобразить снова, чтобы всезначения столбца, разделенные запятыми вместо списка (учтите, что вам нужно будет избегать двойных кавычек, если они могут появляться в поле):
BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))
Теперь мы запишем результаты в GCS с суффиксом иЗаголовки:
BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')
Письменные результаты:
$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"