GCP поток данных с питоном. «AttributeError: Не удалось получить атрибут« _JsonSink »в модуле« dataflow_worker.start » - PullRequest
0 голосов
/ 16 октября 2019

Я новичок в потоке данных GCP.

Я пытаюсь прочитать текстовые файлы (однострочная строка JSON) в формат JSON из облачного хранилища GCP, затем разделить его на основе значений определенного поля и вывести в GCPоблачное хранилище (в виде строкового текстового файла JSON).

Вот мой код

Однако я сталкиваюсь с некоторой ошибкой в ​​потоке данных GCP:

Traceback (most recent call last):
  File "main.py", line 169, in <module>
    run()
  File "main.py", line 163, in run
    shard_name_template='')
  File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\pipeline.py", line 426, in __exit__
    self.run().wait_until_finish()
  File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 1346, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 773, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 287, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 410, in load_session
    module = unpickler.load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 474, in find_class
    return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute '_JsonSink' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py'>

Я могу запустить этот скрипт локально, но он не работает, когда я пытаюсь использовать dataflowRunner

Пожалуйста, дайте мне несколько советов.

PS. версия apache-beam: 2.15.0

[Update1]

Я пробую предложение @Yueyang Qiu, добавьте

pipeline_options.view_as(SetupOptions).save_main_session = True

Приведенная ссылка говорит:

DoFn в этом рабочем процессе зависит от глобального контекста (например, модуль, импортированный на уровне модуля)

Эта ссылка поддерживает предложенное выше.

Однако произошла та же ошибка.

Итак, я думаю, что моя реализация _JsonSink (наследовать от filebasedsink.FileBasedSink) неправильная или что-то еще нужно добавить.

Любое мнение будет оценено, спасибо всем!

Ответы [ 3 ]

0 голосов
/ 14 ноября 2019

Используя указания здесь , мне удалось запустить ваш пример.

Структура каталогов:

./setup.py
./dataflow_json
./dataflow_json/dataflow_json.py  (no change from your example)
./dataflow_json/__init__.py  (empty file)
./main.py

setup.py:

import setuptools

setuptools.setup(
  name='dataflow_json',
  version='1.0',
  install_requires=[],
  packages=setuptools.find_packages(),
)

main.py:

from __future__ import absolute_import

from dataflow_json import dataflow_json

if __name__ == '__main__':
    dataflow_json.run()

и вы запускаете конвейер с помощью python main.py.

В основном происходит то, что флаг '--setup_file=./setup.py' говорит Beam создать пакет иустановите его на удаленном работнике Dataflow. Файл __init__.py необходим для того, чтобы setuptools идентифицировал каталог dataflow_json/ как пакет.

0 голосов
/ 15 ноября 2019

Я наконец-то обнаружил проблему:

класс '_jsonsink', который я реализую, используя некоторые функции из Python3

Однако я не знаю, какую версию Python я использую для 'Dataflowrunner '(На самом деле, я не выяснил, как указать версию Python для обработчика потока данных на GCP. Есть предложения?)

Следовательно, я переписываю свой код в Python2-совместимую версию, все работает отлично!

Спасибо всем вам!

0 голосов
/ 18 октября 2019

Можете ли вы попробовать установить параметр save_main_session = True как здесь: https://github.com/apache/beam/blob/a2b0ad14f1525d1a645cb26f5b8ec45692d9d54e/sdks/python/apache_beam/examples/cookbook/coders.py#L88.

...