Нет пользовательского контекста доступен облачный поток данных Google? - PullRequest
4 голосов
/ 01 июня 2019

Я пытаюсь запустить объединяющий DoFn с отслеживанием состояний в облачном потоке данных Google, который ссылается на отслеживающие отслеживание DoFns в своей матрице возможностей, однако я получаю следующую ошибку:

Исключение: запрошенное выполнение отслеживающего состояние DoFn,но контекст состояния пользователя недоступен.Это, вероятно, означает, что текущий бегун не поддерживает выполнение DoFns с отслеживанием состояния.

Здесь возникает предыдущая ошибка:

@with_input_types(Dict[K, V])
@with_output_types(Dict[K, V])
class StatefulCombineDoFn(beam.DoFn):

    BUFFER = BagStateSpec(
        'buffer', 
        PickleCoder()
    )

    STATE = CombiningValueStateSpec(
        'state', 
        PickleCoder(), 
        CombineFn()
    )

    EXPIRY_TIMER = TimerSpec(
        'expiry', 
        TimeDomain.WATERMARK
    )

    def process(
            self, 
            element,
            w=beam.DoFn.WindowParam,
            buffer=beam.DoFn.StateParam(BUFFER),
            state=beam.DoFn.StateParam(STATE),
            expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)
    ):

                expiry_timer.set(w.end+self.allowed_lateness)
                buffer.add(event)
                state.add(event)

    @on_timer(EXPIRY_TIMER)
    def expiry(
        self,
        state=beam.DoFn.StateParam(STATE),
        buffer=beam.DoFn.StateParam(BUFFER)
    ):

            events = buffer.read()
            info = state.read()

            yield [(info, events)]

Как можно обойти это?

1 Ответ

1 голос
/ 03 июня 2019

К сожалению, бегун Dataflow в настоящее время не поддерживает пользовательское состояние и таймеры.Я обновлю этот ответ, как только он это сделает.

На данный момент, бегуны, которые поддерживают это: портативный бегун Flink, прямой бегун.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...