Проблема начала появляться на выходных.По некоторым причинам, это чувствует, что проблема DataFlow.
Раньше мне удавалось выполнять скрипт и писать записи TF просто отлично.Однако теперь я не могу инициализировать граф вычислений для обработки данных.
След:
Traceback (most recent call last):
File "my_script.py", line 1492, in <module>
MyBeamClass()
File "my_script.py", line 402, in __init__
self.run()
File "my_script.py", line 514, in run
transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))
File "/anaconda3/envs/ml27/lib/python2.7/site-packages/apache_beam/pipeline.py", line 426, in __exit__
self.run().wait_until_finish()
File "/anaconda3/envs/ml27/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1238, in wait_until_finish
(self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 176, in execute
op.start()
File "apache_beam/runners/worker/operations.py", line 531, in apache_beam.runners.worker.operations.DoOperation.start
def start(self):
File "apache_beam/runners/worker/operations.py", line 532, in apache_beam.runners.worker.operations.DoOperation.start
with self.scoped_start_state:
File "apache_beam/runners/worker/operations.py", line 533, in apache_beam.runners.worker.operations.DoOperation.start
super(DoOperation, self).start()
File "apache_beam/runners/worker/operations.py", line 202, in apache_beam.runners.worker.operations.Operation.start
def start(self):
File "apache_beam/runners/worker/operations.py", line 206, in apache_beam.runners.worker.operations.Operation.start
self.setup()
File "apache_beam/runners/worker/operations.py", line 480, in apache_beam.runners.worker.operations.DoOperation.setup
with self.scoped_start_state:
File "apache_beam/runners/worker/operations.py", line 485, in apache_beam.runners.worker.operations.DoOperation.setup
pickler.loads(self.spec.serialized_fn))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 247, in loads
return dill.loads(s)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 317, in loads
return load(file, ignore)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 305, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1232, in load_build
for k, v in state.iteritems():
AttributeError: 'str' object has no attribute 'iteritems'
Я использую tennflowflow == 1.13.1 и tenorflow-transform == 0.9.0 и apache_beam == 2.7.0
with beam.Pipeline(options=self.pipe_opt) as p:
with beam_impl.Context(temp_dir=self.google_cloud_options.temp_location):
# rest of the script
_ = (
transform_fn
| 'WriteTransformFn' >>
transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))