Есть два способа сделать это.
Первый - сделать GroupByKey
и иметь триггер, который срабатывает после каждого отдельного элемента. Примерно так:
keys_together_pcoll = (rekeyed_pcoll
| beam.WindowInto(window.GlobalWindows()
trigger=AfterCount(1))
| beam.GroupByKey()
| beam.FlatMap(lambda x: x[1]))
result_pcoll = (keys_together_pcoll
| beam.ParDo(DoFnWithElementsInCorrespondingWorkers()))
Конечно, это немного неловко.
Еще один способ сделать это - сделать ваш DoFn с состоянием. Это заставит бегуна перетасовать элементы в своих соответствующих работников ключом. Примерно так:
class DoFnWithElementsInCorrespondingWorkers(beam.DoFn):
UNUSED_STATE = BagStateSpec('unused', VarIntCoder())
def process(self,
element,
unused=beam.DoFn.StateParam(UNUSED_STATE)):
# .. My processing
result_pcoll = (rekeyed_pcoll
| beam.ParDo(DoFnWithElementsInCorrespondingWorkers()))
Почему это происходит?
Помните, что в Beam (и Flink, и в аналогичных системах) состояние организовано по ключу , поэтому, если вы вставите DoFn с отслеживанием состояния, Beam распознает, что элементы необходимо перетасовать в правильных работников в соответствии с к их ключам.