Я пытаюсь создать ParDo с сохранением состояния в Apache Beam, который хранит данные о значениях и обновлениях, которые содержат данные из последующих окон.
Эквивалент MapState в Java.
Я попытался реализовать это с помощью пользовательского CombineFn
class DictCombineFn(beam.CombineFn):
def create_accumulator(self):
return {}
def add_input(self, accumulator, element):
accumulator[element["key"]] = element["value"]
return accumulator
def merge_accumulators(self, accumulators):
return accumulators
def extract_output(self, accumulator):
return accumulator
Который используется в CombiningValueStateSpec следующего ParDo:
class EnrichDoFn(beam.DoFn):
DICT_STATE = CombiningValueStateSpec(
'dict',
PickleCoder(),
DictCombineFn()
)
def process(
self,
element,
w=beam.DoFn.WindowParam,
dict_state=beam.DoFn.StateParam(DICT_STATE)
):
asks_state.add(element)
Однако я получаю следующую ошибку во время:
TypeError: объект '_ConcatIterable' не поддерживает назначение элементов
Я думаю, это может быть результатом использования неправильного кодера?
Какова была бы оптимальная стратегия для реализации вышеупомянутой логики?
Спасибо