Эта проблема может возникать по нескольким причинам
- Это определение функции
sentiment
присутствует в том же файле python, что и конвейер луча. - Это определение функции
sentiment
перед тем, как она вызывается в конвейере лучей?
Я провел быструю проверку, как показано ниже, и если оба вышеперечисленных выполняются, он работает как нужно
def testing(messages):
return messages.lower()
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: testing(word))
| "count" >> beam.combiners.Count.PerElement())
ib.show(windowed_lower_word_counts, include_window_info=True)
0 b'have' 3 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
1 b'ransom' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
2 b'let' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
3 b'me' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
Если функция определена после ее вызова, мы получим ошибку, как показано ниже
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: testing_after(word))
| "count" >> beam.combiners.Count.PerElement())
ib.show(windowed_lower_word_counts, include_window_info=True)
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f478f344820>, due to an exception.
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 954, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 552, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/core.py", line 1482, in <lambda>
wrapper = lambda x: [fn(x)]
File "<ipython-input-19-f34e29a17836>", line 2, in <lambda>
| beam.Map(lambda word: testing_after_new(word))
NameError: name 'testing_after' is not defined
def testing_after(messages):
return messages.lower()
ОБНОВЛЕНИЕ
Вместо передачи функции как beam.FlatMap(lambda x : fn(x))
передать функцию как beam.FlatMap(x)
Я полагаю, что в первом случае луч пытается найти fn в рабочих машинах и не может его найти. Подробности реализации можно найти здесь - https://github.com/apache/beam/blob/fa4f4183a315f061e035d38ba2c5d4b837b371e0/sdks/python/apache_beam/transforms/core.py#L780