Очистите файл CSV, записанный в GCS, удалив кавычки и LZIP файл. Нужно ли копировать файл на локальный компьютер, чтобы выполнить очистку, и lzip-файл, как это можно получить?
Переместить очищенный файл LZIP на S3. Может ли поток данных связываться с S3 и записывать файлы?Как я могу получить это
Пример кода ниже
import logging
import apache_beam as beam
PROJECT='project_id'
BUCKET='project_bucket'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=readwritebq',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as p:
# Execute the SQL in big query and store the result data set into given
Destination big query table.
BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query = 'Select * from `dataset.table`',
use_standard_sql=True))
# Destination BQtable
BQ_DATA | 'Write_bq_table' >> beam.io.WriteToBigQuery(
table='tablename',
dataset='datasetname',
project='project_id',
schema='name:string,gender:string,count:integer',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
# write the data from BQ_DATA to GCS in CSV format.
BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())
BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
lambda row: ', '.join(['"' + str(column) + '"' for column in row]))
BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv',
header='word, word count, corpus')
# Clean up the CSV file written to GCS removing the quotes and LZIP the file
**Do we have to copy the file to local to perform the cleanup and lzip the file
, how this can be acheived ?**
# Move the cleaned LZIP file to S3
**Can datflow communicate to S3 and write files ? how can i acheive this**
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()