Для этого вы можете использовать дополнительную функцию вывода Beam.
Пример кода:
results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
.with_outputs('above_cutoff_lengths', 'marked strings',
main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings'] # indexing works as well
После запуска приведенного выше фрагмента кода вы получаете несколько вариантов PC, таких какниже, сверху и помечено. Затем вы можете использовать боковые входы для дальнейшей фильтрации или объединения результатов
Надеюсь, что это поможет.
Обновление
На основе комментариев я хотел бы упомянуть, что Apache Beam имеет возможностивыполнить обработку с сохранением состояния с помощью ValueState
и BagState
. Если требование состоит в том, чтобы прочитать PCollection и затем принять решение на основе наличия предшествующего значения или нет, то такие требования могут быть обработаны с помощью BagState
, как показано ниже: -
def process(self,
element,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):
# Do you processing here
key, value = element
# Read all the data from buffer1
all_values_in_buffer_1 = [x for x in buffer_1.read()]
if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
# clear the buffer data if required conditions are met.
buffer_1.clear()
# add the value to buffer 2
buffer_2.add(value)
if StatefulDoFn._all_condition_met():
# Clear the timer if certain condition met and you don't want to trigger
# the callback method.
watermark_timer.clear()
yield element
@on_timer(WATERMARK_TIMER)
def on_expiry_1(self,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
key=beam.DoFn.KeyParam,
buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
# Window and key parameters are really useful especially for debugging issues.
yield 'expired1'