Apache Beam Python: Как реализовать MapState в Python Apache Beam SDK? - PullRequest
2 голосов
/ 18 мая 2019

Я пытаюсь создать ParDo с сохранением состояния в Apache Beam, который хранит данные о значениях и обновлениях, которые содержат данные из последующих окон. Эквивалент MapState в Java. Я попытался реализовать это с помощью пользовательского CombineFn

class DictCombineFn(beam.CombineFn):
    def create_accumulator(self):
        return {}

    def add_input(self, accumulator, element):
        accumulator[element["key"]] = element["value"]
        return accumulator

    def merge_accumulators(self, accumulators):
        return accumulators

    def extract_output(self, accumulator):
        return accumulator

Который используется в CombiningValueStateSpec следующего ParDo:

class EnrichDoFn(beam.DoFn):

    DICT_STATE = CombiningValueStateSpec(
        'dict', 
        PickleCoder(), 
        DictCombineFn()
    )

    def process(
              self, 
              element,
              w=beam.DoFn.WindowParam,
              dict_state=beam.DoFn.StateParam(DICT_STATE)
        ):
           asks_state.add(element)

Однако я получаю следующую ошибку во время:

TypeError: объект '_ConcatIterable' не поддерживает назначение элементов

Я думаю, это может быть результатом использования неправильного кодера? Какова была бы оптимальная стратегия для реализации вышеупомянутой логики?

Спасибо

Ответы [ 2 ]

0 голосов
/ 28 мая 2019

Аккумуляторы слияния должны возвращать один элемент, а не повторяемый, как в вашем случае. Вы бы сделали аналогичную обработку, как в элементе add.

0 голосов
/ 18 мая 2019

Я не на 100% уверен в том, что означает эта ошибка, но кажется, что тип dict не поддерживается каким-либо образом в этом конкретном процессе. Вы пытались получить список строк, то есть «ключ: значение», а затем проанализировать и преобразовать его в dict за один выстрел?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...