Я только что сделал обновление с apache-beam==2.5.0
до apache-beam[gcp]==2.11.0
, и теперь я получаю сообщение об ошибке ниже:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 178, in execute
op.finish()
File "apache_beam/runners/worker/operations.py", line 549, in apache_beam.runners.worker.operations.DoOperation.finish
def finish(self):
File "apache_beam/runners/worker/operations.py", line 550, in apache_beam.runners.worker.operations.DoOperation.finish
with self.scoped_finish_state:
File "apache_beam/runners/worker/operations.py", line 551, in apache_beam.runners.worker.operations.DoOperation.finish
self.dofn_runner.finish()
File "apache_beam/runners/common.py", line 758, in apache_beam.runners.common.DoFnRunner.finish
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 777, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 750, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
bundle_method()
File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
def invoke_finish_bundle(self):
File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
self.output_processor.finish_bundle_outputs(
File "apache_beam/runners/common.py", line 887, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 130, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.update_counters_start(windowed_value)
File "apache_beam/runners/worker/operations.py", line 110, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
self.opcounter.update_from(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
self.do_sample(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "apache_beam/coders/coder_impl.py", line 991, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
def get_estimated_size_and_observables(self, value, nested=False):
File "apache_beam/coders/coder_impl.py", line 1007, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
self._windows_coder.estimate_size(value.windows, nested=True))
File "apache_beam/coders/coder_impl.py", line 791, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
self.get_estimated_size_and_observables(value))
File "apache_beam/coders/coder_impl.py", line 805, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
self._elem_coder.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 145, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
return self.estimate_size(value, nested), []
File "apache_beam/coders/coder_impl.py", line 487, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
typed_value = value
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'output_timings/Write/WriteImpl/WriteBundles/WriteBundles']
У меня есть некоторый код для сохранения результатов конвейера в облачном хранилище Google, например:
## OUTPUT
json_output = joint_data | "json_output" >> beam.ParDo(WriteAsJSON())
output = json_output | "output" >> WriteToText(known_args.output + "/dataset.json-lines")
result = p.run()
result.wait_until_finish()
Где WriteAsJSON
что-то вроде этого:
@th.with_input_types(th.Any)
@th.with_output_types(six.text_type)
class WriteAsJSON(beam.DoFn):
@checklist_item("variable day.mean_pages")
def process(self, element):
(visitor_address, joint_dict) = element
visit_variables = joint_dict['visit']
day_mean_pages = joint_dict['day.mean_pages']
d = {}
if (len(day_mean_pages) == 0) or (len(visit_variables) == 0):
return []
visit_variables=visit_variables[0]
visitor_country_datum = joint_dict['country']
for (var_name, var_value) in visit_variables.items():
d[var_name] = var_value
# for (var_name, var_value) in day_mean_pages.items():
# d['day.' + var_name ] = var_value
d['day.mean_pages'] = day_mean_pages[0]
# Just merge and dump everything
d['visitor_address'] = visitor_address
if len(visitor_country_datum) > 0:
d['visitor_iso2'] = visitor_country_datum[0]['iso2']
d['ip.geographical_distance'] = visitor_country_datum[0]['ip.geographical_distance']
else:
d['visitor_iso2'] = None
d['ip.geographical_distance'] = None
s = json.dumps(d)
return [s]
Что-то изменилось в apache-beam
API, что привело к поломке? Я читал заметки о выпуске, но ничего о них не видел. Любая помощь будет очень кстати. Заранее спасибо.
Я проверил этот вопрос Как создать конвейер потока данных из Pub / Sub в GCS в Python и попытался вернуться к apache-beam[gcp]==2.9.0
, но в моем случае это не сработало.