Как запустить Apache Beam локально? - PullRequest
0 голосов
/ 02 апреля 2019

Я пытаюсь запустить скрипт Python Apache Beam на моей локальной машине, чтобы провести некоторую симуляцию.Я добавил 'DirectRunner' в мои настройки.Однако p.run () выдает мне ошибку «TypeError: Receiver () не принимает аргументов»

Есть идеи, почему это произойдет?Я использую Spyder в качестве IDE.

РЕДАКТИРОВАТЬ: Вот пример кода, он принимает список сообщений в виде:

{ "Val_1": 1, "Val_2": 56, "date": "2019-04-01T15:00:04.340778" }

разбить его и положить в видеиз

(1, 56, 2019-04-01T15:00:04.340778)

, затем сохраните его в текстовом файле.

p = beam.Pipeline('DirectRunner')
(p | 'ReadMessage' >>  beam.io.textio.ReadFromTextWithFilename('input/inputs.json')
                    | 'Processing' >> beam.ParDo(Split())
                    | 'Write' >> beam.io.WriteToText('input/results.txt'))
p.run().wait_until_finish() 

Ошибка:

"TypeError: Receiver() takes no arguments"

1 Ответ

0 голосов
/ 09 апреля 2019

Вы исполняете свой файл Python Beam, как и любой обычный файл, при условии, что вы указали свой конвейер как DirectRunner, что вы и сделали с

p = beam.Pipeline('DirectRunner')

. В настоящее время Apache Beam имеет ограниченную поддержку Python 3.x.Если вы попытаетесь выполнить пример подсчета слов , это приведет к той же ошибке.Это будет исправлено в будущем, так как в настоящее время они работают над полной поддержкой Python 3.

Когда вы хотите развернуть свой код Python Beam с помощью Google Cloud Platform, я настоятельно рекомендую перейти на Python 2.7..

Вы можете отслеживать проблемы здесь

Однако я не могу сказать, что именно делает ваша функция Split, поэтому приведу минимальный рабочий пример, чтобы вы моглипротестируйте установку Beam.

import apache_beam as beam
import ast

# The DoFn to perform on each element in the input PCollection.
class Split(beam.DoFn):
    def process(self, element):
        val = ast.literal_eval(element[1])
        output ='('+','.join(map(str, val.values())) + ')'
        return [output]

def run():
    p = beam.Pipeline('DirectRunner')
    (p | 'ReadMessage' >>  beam.io.textio.ReadFromTextWithFilename('input/inputs.json')
                        | 'Processing' >> beam.ParDo(Split())
                        | 'Write' >> beam.io.WriteToText('input/results.txt'))
    result = p.run()
    result.wait_until_finish()

if __name__ == "__main__":
    run()
...