Вы исполняете свой файл Python Beam, как и любой обычный файл, при условии, что вы указали свой конвейер как DirectRunner, что вы и сделали с
p = beam.Pipeline('DirectRunner')
. В настоящее время Apache Beam имеет ограниченную поддержку Python 3.x.Если вы попытаетесь выполнить пример подсчета слов , это приведет к той же ошибке.Это будет исправлено в будущем, так как в настоящее время они работают над полной поддержкой Python 3.
Когда вы хотите развернуть свой код Python Beam с помощью Google Cloud Platform, я настоятельно рекомендую перейти на Python 2.7..
Вы можете отслеживать проблемы здесь
Однако я не могу сказать, что именно делает ваша функция Split, поэтому приведу минимальный рабочий пример, чтобы вы моглипротестируйте установку Beam.
import apache_beam as beam
import ast
# The DoFn to perform on each element in the input PCollection.
class Split(beam.DoFn):
def process(self, element):
val = ast.literal_eval(element[1])
output ='('+','.join(map(str, val.values())) + ')'
return [output]
def run():
p = beam.Pipeline('DirectRunner')
(p | 'ReadMessage' >> beam.io.textio.ReadFromTextWithFilename('input/inputs.json')
| 'Processing' >> beam.ParDo(Split())
| 'Write' >> beam.io.WriteToText('input/results.txt'))
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
run()