Проблема обновления Apache Beam Python SDK - PullRequest
1 голос
/ 11 марта 2019

Я только что сделал обновление с apache-beam==2.5.0 до apache-beam[gcp]==2.11.0, и теперь я получаю сообщение об ошибке ниже:

 Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 178, in execute
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 549, in apache_beam.runners.worker.operations.DoOperation.finish
    def finish(self):
  File "apache_beam/runners/worker/operations.py", line 550, in apache_beam.runners.worker.operations.DoOperation.finish
    with self.scoped_finish_state:
  File "apache_beam/runners/worker/operations.py", line 551, in apache_beam.runners.worker.operations.DoOperation.finish
    self.dofn_runner.finish()
  File "apache_beam/runners/common.py", line 758, in apache_beam.runners.common.DoFnRunner.finish
    self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
  File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 777, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 750, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
    bundle_method()
  File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
    def invoke_finish_bundle(self):
  File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
    self.output_processor.finish_bundle_outputs(
  File "apache_beam/runners/common.py", line 887, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 130, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.update_counters_start(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 110, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
    self.opcounter.update_from(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
    self.do_sample(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
    self.coder_impl.get_estimated_size_and_observables(windowed_value))
  File "apache_beam/coders/coder_impl.py", line 991, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    def get_estimated_size_and_observables(self, value, nested=False):
  File "apache_beam/coders/coder_impl.py", line 1007, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    self._windows_coder.estimate_size(value.windows, nested=True))
  File "apache_beam/coders/coder_impl.py", line 791, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
    self.get_estimated_size_and_observables(value))
  File "apache_beam/coders/coder_impl.py", line 805, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
    self._elem_coder.get_estimated_size_and_observables(
  File "apache_beam/coders/coder_impl.py", line 145, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
    return self.estimate_size(value, nested), []
  File "apache_beam/coders/coder_impl.py", line 487, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
    typed_value = value
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'output_timings/Write/WriteImpl/WriteBundles/WriteBundles']

У меня есть некоторый код для сохранения результатов конвейера в облачном хранилище Google, например:

## OUTPUT
json_output = joint_data | "json_output" >> beam.ParDo(WriteAsJSON())
output = json_output | "output" >> WriteToText(known_args.output + "/dataset.json-lines")


result = p.run()
result.wait_until_finish()

Где WriteAsJSON что-то вроде этого:

@th.with_input_types(th.Any)
@th.with_output_types(six.text_type)
class WriteAsJSON(beam.DoFn):

    @checklist_item("variable day.mean_pages")
    def process(self, element):
        (visitor_address, joint_dict) =  element

        visit_variables = joint_dict['visit']
        day_mean_pages = joint_dict['day.mean_pages']

        d = {}

        if (len(day_mean_pages) == 0) or (len(visit_variables) == 0):
            return []

        visit_variables=visit_variables[0]

        visitor_country_datum = joint_dict['country']


        for (var_name, var_value) in visit_variables.items():
            d[var_name] = var_value

        # for (var_name, var_value) in day_mean_pages.items():
        #     d['day.' + var_name ] = var_value
        d['day.mean_pages'] = day_mean_pages[0]

        # Just merge and dump everything
        d['visitor_address'] = visitor_address

        if len(visitor_country_datum) > 0:
            d['visitor_iso2'] = visitor_country_datum[0]['iso2']
            d['ip.geographical_distance'] = visitor_country_datum[0]['ip.geographical_distance']
        else:
            d['visitor_iso2'] = None
            d['ip.geographical_distance'] = None


        s = json.dumps(d)
        return [s]

Что-то изменилось в apache-beam API, что привело к поломке? Я читал заметки о выпуске, но ничего о них не видел. Любая помощь будет очень кстати. Заранее спасибо.

Я проверил этот вопрос Как создать конвейер потока данных из Pub / Sub в GCS в Python и попытался вернуться к apache-beam[gcp]==2.9.0, но в моем случае это не сработало.

...