Wait.On () в версии Apache Beam Python SDK - PullRequest
0 голосов
/ 09 ноября 2019

Я использую Apache Beam на Python и хотел бы спросить, что эквивалентно Apache Beam Java Wait.on() на Python SDK?

В настоящее время у меня проблема с этим фрагментом кода ниже

    if len(output_pcoll) > 1:
        merged = (tuple(output_pcoll) |
                  'MergePCollections1' >> beam.Flatten())
    else:
        merged = output_pcoll[0]

    outlier_side_input = self.construct_outlier_side_input(merged)

    (merged |
     "RemoveOutlier" >>
     beam.ParDo(utils.Remove_Outliers(),
                beam.pvalue.AsDict(outlier_side_input)) |
     "WriteToCSV" >>
     beam.io.WriteToText('../../ML-DATA/{0}.{1}'.format(self.BUCKET,
                         self.OUTPUT), num_shards=1))

кажется, что Apache Beam не ожидает завершения кода на self.construct_outlier_side_input и приводит к вводу пустой стороны при выполнении «RemoveOutlier» в следующем конвейере. В Java-версии вы можете использовать Wait.On() для ожидания завершения construct_outlier_side_input, однако я не смог найти эквивалентный метод в Python SDK.

- Правка - я пытаюсь добиться этогопочти так же, как в этой ссылке, https://rmannibucau.metawerx.net/post/apache-beam-initialization-destruction-task

1 Ответ

0 голосов
/ 10 ноября 2019

Для этого вы можете использовать дополнительную функцию вывода Beam.

Пример кода:

results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
           .with_outputs('above_cutoff_lengths', 'marked strings',
                         main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings']  # indexing works as well

После запуска приведенного выше фрагмента кода вы получаете несколько вариантов PC, таких какниже, сверху и помечено. Затем вы можете использовать боковые входы для дальнейшей фильтрации или объединения результатов

Надеюсь, что это поможет.

Обновление

На основе комментариев я хотел бы упомянуть, что Apache Beam имеет возможностивыполнить обработку с сохранением состояния с помощью ValueState и BagState. Если требование состоит в том, чтобы прочитать PCollection и затем принять решение на основе наличия предшествующего значения или нет, то такие требования могут быть обработаны с помощью BagState, как показано ниже: -

def process(self,
              element,
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam,
              buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
              buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
              watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):

    # Do you processing here
    key, value = element
    # Read all the data from buffer1
    all_values_in_buffer_1 = [x for x in buffer_1.read()]

    if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
        # clear the buffer data if required conditions are met.
        buffer_1.clear()

    # add the value to buffer 2
    buffer_2.add(value)

    if StatefulDoFn._all_condition_met():
      # Clear the timer if certain condition met and you don't want to trigger
      # the callback method.
      watermark_timer.clear()

    yield element

  @on_timer(WATERMARK_TIMER)
  def on_expiry_1(self,
                  timestamp=beam.DoFn.TimestampParam,
                  window=beam.DoFn.WindowParam,
                  key=beam.DoFn.KeyParam,
                  buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
                  buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
    # Window and key parameters are really useful especially for debugging issues.
    yield 'expired1'
...