Поток данных - функция не вызывается - ошибка - имя не определено - PullRequest
0 голосов
/ 19 апреля 2020

Я работаю с Apache Beam в Google Dataflow, и я вызываю настрой функции через лямбда-функцию, и я получаю сообщение об ошибке, имя функции не определено.

output_tweets = (lines
                     | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
                     | 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
                     | 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
                     | 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
                     )

Это было * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *. *

Я получаю следующую трассировку

  File "stream.py", line 97, in <lambda>
NameError: name 'sentiment' is not defined [while running 'generatedPtransform-441']

Есть идеи?

1 Ответ

1 голос
/ 19 апреля 2020

Эта проблема может возникать по нескольким причинам

  1. Это определение функции sentiment присутствует в том же файле python, что и конвейер луча.
  2. Это определение функции sentiment перед тем, как она вызывается в конвейере лучей?

Я провел быструю проверку, как показано ниже, и если оба вышеперечисленных выполняются, он работает как нужно

def testing(messages):
    return messages.lower()

windowed_lower_word_counts = (windowed_words
                              | beam.Map(lambda word: testing(word))
                              | "count" >> beam.combiners.Count.PerElement())

ib.show(windowed_lower_word_counts, include_window_info=True)

0   b'have'     3   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0
1   b'ransom'   1   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0
2   b'let'      1   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0
3   b'me'       1   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0

Если функция определена после ее вызова, мы получим ошибку, как показано ниже

windowed_lower_word_counts = (windowed_words
                              | beam.Map(lambda word: testing_after(word))
                              | "count" >> beam.combiners.Count.PerElement())

ib.show(windowed_lower_word_counts, include_window_info=True)

ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f478f344820>, due to an exception.
 Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 954, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 552, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/core.py", line 1482, in <lambda>
    wrapper = lambda x: [fn(x)]
  File "<ipython-input-19-f34e29a17836>", line 2, in <lambda>
    | beam.Map(lambda word: testing_after_new(word))
NameError: name 'testing_after' is not defined

def testing_after(messages):
    return messages.lower()

ОБНОВЛЕНИЕ

Вместо передачи функции как beam.FlatMap(lambda x : fn(x)) передать функцию как beam.FlatMap(x)

Я полагаю, что в первом случае луч пытается найти fn в рабочих машинах и не может его найти. Подробности реализации можно найти здесь - https://github.com/apache/beam/blob/fa4f4183a315f061e035d38ba2c5d4b837b371e0/sdks/python/apache_beam/transforms/core.py#L780

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...