Синтаксис Apache Beam Python - PullRequest
       17

Синтаксис Apache Beam Python

0 голосов
/ 26 августа 2018

Итак, я только начинаю работать с Apache Beam. Я планирую запускать задания DataFlow в GCP, я изначально запускал их с DataPrep, но быстро перерос их функциональность. Предостерегаю, я программирую на Python 2/3 уже 2 года, поэтому я думаю, что перешел от новичка к любителю, просто для вашего осознания. Итак, вот моя проблема, я успешно написал некоторый код AB (версия 2.6) в моей IDE. Но я ничего не мог заставить работать. То есть, даже после чтения в CSV-файл на PCollection, я не мог видеть, что это сработало. То есть он просто говорит: «Объект PCollection в 0xf3a6 ...»

Итак, я видел, как другие люди писали об этом, когда я лихорадочно гуглил, и они сказали, что вы должны использовать выражение «с», чтобы Python автоматически открывал и закрывал линию? Итак, как только я это сделал, я по крайней мере смог записать вывод того, что я только что прочитал, в файл, чтобы увидеть, что ЧТО-ТО произошло. Итак, во-первых, я нахожу действительно странным, что один и тот же код, который я написал раньше, ничего не делал, пока я не вставил его в оператор with ... что с этим? Нужно ли делать все для конвейера в операторе with? И другие определения только для обычного Python? Вот код:

def run(self, argv=None):

    #p = beam.Pipeline()
    with beam.Pipeline(options=PipelineOptions()) as p:
        left_side = p | 'Read_Left_Side' >> beam.io.ReadFromText('/me/left_side_table.csv')
        left_side | 'Write' >> beam.io.WriteToText('/me/', file_name_suffix='purple_nurple.csv')
        right_side = p | 'Read_Right_Side' >> beam.io.ReadFromText('/me/right_side_table.csv')
    # left_side = p | 'Read_Left_Side' >> beam.io.ReadFromText('gs://path/to/left_side.csv')
    # right_side = p | 'Read_Right_Side' >> beam.io.ReadFromText('gs://path/to/right_side.csv')

    hello=[1,2,3,4,5,6]|beam.Map(lambda x: 3**x)

    left_side = p | 'Read' >> beam.io.ReadFromText('/me/left_side_table.csv')
    left_side | 'Write' >> beam.io.WriteToText('/me/', file_name_suffix='purple_nurple.csv')
    print(left_side)
    right_side = p | 'Read' >> beam.io.ReadFromText('/me//right_side_table.csv')
    howdy= left_side|beam.Map(lambda x: x/2)
    pass

1 Ответ

0 голосов
/ 26 августа 2018

Вам нужно вызвать piepleine.run () для выполнения конвейера.Пучковый конвейер также следует идиоме ресурсов, упомянутой здесь https://docs.python.org/2.7/reference/compound_stmts.html#the-with-statement Поэтому, когда вы используете with pipeline, вам не нужно вызывать pipe.run ().Вы можете использовать любой подход в вашем коде.Чтобы ответить на ваши вопросы

Итак, во-первых, я нахожу действительно странным, что один и тот же код, который я написал ранее, ничего не делал, пока не вставил его в утверждение with ... что с этим?

Beam Pipeline следует идиоме здесь https://docs.python.org/2.7/reference/compound_stmts.html#the-with-statement

Нужно ли делать все для конвейера в операторе with?

Если вы используете идиому ресурса, то да.Но если вы вызываете pipe.run () сами, тогда с оператором нет.В вашем коде вы используете 'with', поэтому изменения конвейера после 'with' не применяются к заданию.

А другие определения предназначены только для обычного Python?

Что определяет?

...