Я начинаю пробовать Google Cloud Dataflow, после примера classi c wordcount
я написал свой собственный скрипт:
import argparse
import sys
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
class Split(beam.DoFn):
def process(self, element):
(numfact, bag, type, owner,
main_owner, client) = element.splt('\t')
return [{
'numfact': int(numfact),
'type': type,
'owner': owner
}]
parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
known_args, extra_args = parser.parse_known_args(sys.argv[1:])
options = PipelineOptions(extra_args)
p = beam.Pipeline(options=options)
print(known_args)
print(extra_args)
csv_lines = (p | "Load" >> ReadFromText(known_args.input, skip_header_lines=1) | "Process" >> beam.ParDo(Split()) | "Write" >> WriteToText(known_args.output))
Вот пример из входного файла:
Numfact BAG TYPE OWNER MAIN OWNER CLIENT
728632636 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES TELETICKET Rimac
704845964 CNT Alternativos Kramer Ortiz SOAT Canal
701387639 CNT SIN ASIGNAR Sin asignar WEB VEHICULOS Canal
692571746 CNT Concesionarios Kramer Ortiz WEB VEHICULOS Canal
682823453 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
682823452 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
682823451 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
682823454 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
706853395 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES - WEB Canal
706466281 CNT Alternativos Kramer Ortiz SOAT Canal
Наконец, я вызываю это для выполнения следующим образом (файл сохраняется как .txt):
python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1
После этого он показывает отпечатки на консоли, но не регистрирует выполнение в консоли DataFlow.
Обновление
Вот так выглядит консоль:
(gcp) gocht@~/script$ python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1
Namespace(input='gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt', output='gs://dummy_bucket/outputs') ['--runner', 'DataflowRunner', '--project', 'dummyproject-268120', '--temp_location', 'gs://dummy_bucket/tmp', '--region', 'us-central1']
Здесь отображаются только отпечатки, размещенные в коде скрипта.
Что я скучаю?
Спасибо!