Я пытаюсь запустить объединяющий DoFn с отслеживанием состояний в облачном потоке данных Google, который ссылается на отслеживающие отслеживание DoFns в своей матрице возможностей, однако я получаю следующую ошибку:
Исключение: запрошенное выполнение отслеживающего состояние DoFn,но контекст состояния пользователя недоступен.Это, вероятно, означает, что текущий бегун не поддерживает выполнение DoFns с отслеживанием состояния.
Здесь возникает предыдущая ошибка:
@with_input_types(Dict[K, V])
@with_output_types(Dict[K, V])
class StatefulCombineDoFn(beam.DoFn):
BUFFER = BagStateSpec(
'buffer',
PickleCoder()
)
STATE = CombiningValueStateSpec(
'state',
PickleCoder(),
CombineFn()
)
EXPIRY_TIMER = TimerSpec(
'expiry',
TimeDomain.WATERMARK
)
def process(
self,
element,
w=beam.DoFn.WindowParam,
buffer=beam.DoFn.StateParam(BUFFER),
state=beam.DoFn.StateParam(STATE),
expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)
):
expiry_timer.set(w.end+self.allowed_lateness)
buffer.add(event)
state.add(event)
@on_timer(EXPIRY_TIMER)
def expiry(
self,
state=beam.DoFn.StateParam(STATE),
buffer=beam.DoFn.StateParam(BUFFER)
):
events = buffer.read()
info = state.read()
yield [(info, events)]
Как можно обойти это?