Итак, я только начинаю работать с Apache Beam. Я планирую запускать задания DataFlow в GCP, я изначально запускал их с DataPrep, но быстро перерос их функциональность. Предостерегаю, я программирую на Python 2/3 уже 2 года, поэтому я думаю, что перешел от новичка к любителю, просто для вашего осознания. Итак, вот моя проблема, я успешно написал некоторый код AB (версия 2.6) в моей IDE. Но я ничего не мог заставить работать. То есть, даже после чтения в CSV-файл на PCollection, я не мог видеть, что это сработало. То есть он просто говорит: «Объект PCollection в 0xf3a6 ...»
Итак, я видел, как другие люди писали об этом, когда я лихорадочно гуглил, и они сказали, что вы должны использовать выражение «с», чтобы Python автоматически открывал и закрывал линию? Итак, как только я это сделал, я по крайней мере смог записать вывод того, что я только что прочитал, в файл, чтобы увидеть, что ЧТО-ТО произошло. Итак, во-первых, я нахожу действительно странным, что один и тот же код, который я написал раньше, ничего не делал, пока я не вставил его в оператор with ... что с этим? Нужно ли делать все для конвейера в операторе with? И другие определения только для обычного Python? Вот код:
def run(self, argv=None):
#p = beam.Pipeline()
with beam.Pipeline(options=PipelineOptions()) as p:
left_side = p | 'Read_Left_Side' >> beam.io.ReadFromText('/me/left_side_table.csv')
left_side | 'Write' >> beam.io.WriteToText('/me/', file_name_suffix='purple_nurple.csv')
right_side = p | 'Read_Right_Side' >> beam.io.ReadFromText('/me/right_side_table.csv')
# left_side = p | 'Read_Left_Side' >> beam.io.ReadFromText('gs://path/to/left_side.csv')
# right_side = p | 'Read_Right_Side' >> beam.io.ReadFromText('gs://path/to/right_side.csv')
hello=[1,2,3,4,5,6]|beam.Map(lambda x: 3**x)
left_side = p | 'Read' >> beam.io.ReadFromText('/me/left_side_table.csv')
left_side | 'Write' >> beam.io.WriteToText('/me/', file_name_suffix='purple_nurple.csv')
print(left_side)
right_side = p | 'Read' >> beam.io.ReadFromText('/me//right_side_table.csv')
howdy= left_side|beam.Map(lambda x: x/2)
pass