Только что перенес мой конвейер с Python 2,7 на 3,6 и Apache Beam 2.17. (То же самое происходит с Apache Beam 2.16)
Хотелось бы лучше понять область действия метода Python 3.x в DataFlow / Apache Beam.
Код здесь :
При вызове beam.FlatMap
внутри main
метода:
# Window them, and batch them into batches of 50
output_tweets = (lines
| 'assign window key' >> beam.WindowInto(
window.FixedWindows(10))
| 'batch into n batches' >> BatchElements(
min_batch_size=99, max_batch_size=100)
| 'predict sentiment' >> beam.FlatMap(
lambda messages: prediction_handler(messages))
)
В Python 2.x моя функция prediction_handler
была определена до этой части кода и работала просто хорошо, теперь я получаю:
INFO:root:2020-01-11T07:55:54.790Z: JOB_MESSAGE_ERROR: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -229: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 441, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "twitter_dataflow.py", line 210, in <lambda>
NameError: name 'prediction_handler' is not defined
Если я создаю функцию "main" и определяю все функции внутри этой, это, кажется, работает, это ожидаемое поведение? То же самое происходит для любого метода, даже если они были объявлены ранее, и в качестве обходного пути мне нужно было переместить все функции внутри main
:
- initialize_api
- aggregate_format
- предсказание
- предсказание_хандлер
Функции были определены ранее. Уже изучил этот вопрос.