Запустить скрипт на GCP Dataflow - PullRequest
0 голосов
/ 25 февраля 2020

Я начинаю пробовать Google Cloud Dataflow, после примера classi c wordcount я написал свой собственный скрипт:

import argparse
import sys

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


class Split(beam.DoFn):

    def process(self, element):
        (numfact, bag, type, owner, 
         main_owner, client) = element.splt('\t')

        return [{
            'numfact': int(numfact),
            'type': type,
            'owner': owner
        }]


parser = argparse.ArgumentParser()

parser.add_argument('--input')
parser.add_argument('--output')

known_args, extra_args = parser.parse_known_args(sys.argv[1:])

options = PipelineOptions(extra_args)
p = beam.Pipeline(options=options)
print(known_args)
print(extra_args)
csv_lines = (p | "Load" >> ReadFromText(known_args.input, skip_header_lines=1) | "Process" >> beam.ParDo(Split()) | "Write" >> WriteToText(known_args.output))

Вот пример из входного файла:

Numfact BAG TYPE    OWNER   MAIN OWNER  CLIENT
728632636   CNT Alternativos    Kramer Ortiz    ACCIDENTES PERSONALES TELETICKET    Rimac
704845964   CNT Alternativos    Kramer Ortiz    SOAT    Canal
701387639   CNT SIN ASIGNAR Sin asignar WEB VEHICULOS   Canal
692571746   CNT Concesionarios  Kramer Ortiz    WEB VEHICULOS   Canal
682823453   CNT Alternativos    Kramer Ortiz    WEB VEHICULOS   Canal
682823452   CNT Alternativos    Kramer Ortiz    WEB VEHICULOS   Canal
682823451   CNT Alternativos    Kramer Ortiz    WEB VEHICULOS   Canal
682823454   CNT Alternativos    Kramer Ortiz    WEB VEHICULOS   Canal
706853395   CNT Alternativos    Kramer Ortiz    ACCIDENTES PERSONALES - WEB Canal
706466281   CNT Alternativos    Kramer Ortiz    SOAT    Canal

Наконец, я вызываю это для выполнения следующим образом (файл сохраняется как .txt):

python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1

После этого он показывает отпечатки на консоли, но не регистрирует выполнение в консоли DataFlow.

Обновление

Вот так выглядит консоль:

(gcp) gocht@~/script$ python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1
Namespace(input='gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt', output='gs://dummy_bucket/outputs')   ['--runner', 'DataflowRunner', '--project', 'dummyproject-268120', '--temp_location', 'gs://dummy_bucket/tmp', '--region', 'us-central1']

Здесь отображаются только отпечатки, размещенные в коде скрипта.

Что я скучаю?

Спасибо!

Ответы [ 2 ]

1 голос
/ 26 февраля 2020

Поскольку ответ находится в комментарии, напишите его также здесь: *

Вам необходимо запустить конвейер, выполнив: не уверен, что не так, попробуйте взглянуть на примеры здесь - версия java мне очень помогла при запуске с потоком данных:)

1 голос
/ 26 февраля 2020

Вам понадобится

result = p.run()

в конце вашего файла для запуска конвейера.

В основном я думаю, что вы построили свой конвейер, но на самом деле не просили его запустить.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...