Python функция потока данных класса DoFN, fin_bundle, запускается несколько раз и выдает пустые выходные данные - PullRequest
0 голосов
/ 27 февраля 2020

Я использую конвейер потока данных, в котором мне нужно объединить данные в один Python фрейм данных для использования на следующем шаге. Поэтому я использую класс DoFn и определяю функции __init__, process и finish_bundle, как показано ниже. Я надеюсь получить один вывод, содержащий все записи, объединенные в один фрейм данных. Я передаю этот вывод на следующий шаг в конвейере как однонаправленный ввод.

class collate_ga_data(beam.DoFn):
    def __init__(self):
        self._ga_data = pd.DataFrame()
        self.window = beam.window.GlobalWindow()
        logging.info("In INITIALIZATION :   {0}".format(self.window))

    def process(self, element,window=beam.DoFn.WindowParam):
        self.window = window
        logging.info("In PROCESS :   {0}".format(self.window))
        self._ga_data=self._ga_data.append(pd.DataFrame({k: [v] for k, v in element.items()}))

    def finish_bundle(self):
        logging.info(" The shape of ga_dataset imported  :  {0}".format(self._ga_data.shape))
        logging.info("In FINISH BUNDLE :   {0}".format(self.window))
        yield WindowedValue(self._ga_data,0,windows=[self.window])

Этот код отлично работает при использовании Directrunner и дает ожидаемые результаты, но при использовании обработчиков потока данных выдает ошибку:

File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 178, in execute
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 612, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/common.py", line 824, in apache_beam.runners.common.DoFnRunner.finish
  File "apache_beam/runners/common.py", line 808, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 806, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 398, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 401, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 959, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 610, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/sideinputs.py", line 65, in __getitem__
    _FilteringIterable(self._iterable, target_window), self._view_options)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/pvalue.py", line 443, in _from_runtime_iterable
    len(head), str(head[0]), str(head[1])))
ValueError: PCollection of size 2 with more than one element accessed as a singleton view. First two elements encountered are "Empty DataFrame

Я немного покопался и обнаружил, что DoFn выдает 3 выхода - 1 - это требуемый кадр данных, а два других - пустые. Finish_bundle выдает 3 результата. Я не могу понять причину этого. Я не хочу использовать какие-либо оконные функции для этого, но в соответствии с документами выходные данные из finish_bundle должны быть оконными значениями, поэтому у меня есть глобальное окно.

Информация о регистрации из приведенного выше кода: как показано ниже:

2020-02-27T16:32:45.331291913Z  The shape of ga_dataset imported  :  (0, 0) I 
2020-02-27T16:32:45.331489801Z In FINISH BUNDLE :   GlobalWindow I 
2020-02-27T16:32:45.390583276Z  The shape of ga_dataset imported  :  (0, 0) I 
2020-02-27T16:32:45.390754222Z In FINISH BUNDLE :   GlobalWindow I 
2020-02-27T16:32:48.639126300Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.641757011Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.644909381Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.647359848Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.649686336Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.651899814Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.654145240Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.656555175Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.658823966Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.660887002Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.663397789Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.665476560Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.667604684Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.669671535Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.672025680Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.674037218Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.676348209Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.678587436Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.680708885Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.682787656Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.685523986Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.687734365Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.689816713Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.691826343Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.693920373Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.696102380Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.698341846Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.700649023Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.703155755Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.705482244Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.707590818Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.709594726Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.711608886Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.713906288Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.716273546Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.718636035Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.720866918Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.723044872Z  The shape of ga_dataset imported  :  (37, 8) I 
2020-02-27T16:32:48.723157405Z In FINISH BUNDLE :   GlobalWindow I 

В потоке данных используется только один рабочий для этого конвейера. Кто-нибудь знает, почему это происходит?

1 Ответ

3 голосов
/ 28 февраля 2020

В соответствии с моделью исполнения Beam , «Деление коллекции на связки является произвольным и выбирается бегуном». Вот почему finish_bundle может вызываться несколько раз.

Похоже, что ваша проблема может быть лучше решена с помощью CombineFn с CombineGlobally, с использованием DataFrame в качестве аккумулятора. Обратитесь к руководству по программированию Beam (4.2.4. Объединение) для получения инструкций по его реализации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...