Я новичок в потоке данных GCP.
Я пытаюсь прочитать текстовые файлы (однострочная строка JSON) в формат JSON из облачного хранилища GCP, затем разделить его на основе значений определенного поля и вывести в GCPоблачное хранилище (в виде строкового текстового файла JSON).
Вот мой код
Однако я сталкиваюсь с некоторой ошибкой в потоке данных GCP:
Traceback (most recent call last):
File "main.py", line 169, in <module>
run()
File "main.py", line 163, in run
shard_name_template='')
File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\pipeline.py", line 426, in __exit__
self.run().wait_until_finish()
File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 1346, in wait_until_finish
(self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 773, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 287, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 410, in load_session
module = unpickler.load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 474, in find_class
return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute '_JsonSink' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py'>
Я могу запустить этот скрипт локально, но он не работает, когда я пытаюсь использовать dataflowRunner
Пожалуйста, дайте мне несколько советов.
PS. версия apache-beam: 2.15.0
[Update1]
Я пробую предложение @Yueyang Qiu, добавьте
pipeline_options.view_as(SetupOptions).save_main_session = True
Приведенная ссылка говорит:
DoFn в этом рабочем процессе зависит от глобального контекста (например, модуль, импортированный на уровне модуля)
Эта ссылка поддерживает предложенное выше.
Однако произошла та же ошибка.
Итак, я думаю, что моя реализация _JsonSink (наследовать от filebasedsink.FileBasedSink) неправильная или что-то еще нужно добавить.
Любое мнение будет оценено, спасибо всем!