Таймер обратного вызова DoFn не может получить доступ - PullRequest
0 голосов
/ 18 июня 2019

Я пишу преобразование GroupIntoBatches в Python, которое имитирует его аналог в JAVA.

class GroupIntoBatches(PTransform):
  def __init__(self, batch_size):
    self.batch_size = batch_size

  @staticmethod
  def of_size(batch_size):
    return GroupIntoBatches(batch_size)

  def expand(self, pcoll):
    input_coder = coders.registry.get_coder(pcoll)
    if not input_coder.is_kv_coder():
      raise ValueError('coder specified in the input \
      PCollection is not a KvCoder')

    return pcoll | ParDo(_pardo_group_into_batches(self.batch_size, input_coder))


def _pardo_group_into_batches(batch_size, input_coder):
  ELEMENT_STATE = BagStateSpec('values', input_coder)
  COUNT_STATE = CombiningValueStateSpec('count', input_coder, CountCombineFn())
  EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK)

  class _GroupIntoBatchesDoFn(DoFn):

    def process(self, element, 
                window=DoFn.WindowParam, 
                element_state=DoFn.StateParam(ELEMENT_STATE), 
                count_state=DoFn.StateParam(COUNT_STATE), 
                expiry_timer=DoFn.TimerParam(EXPIRY_TIMER)):
      # Allowed lateness not supported in Python SDK
      # https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data      
      expiry_timer.set(window.max_timestamp())
      element_state.add(element)
      count_state.add(1)
      count = count_state.read()
      if count >= batch_size:
        batch = [element for element in element_state.read()]
        yield batch
        element_state.clear()
        count_state.clear()

    @on_timer(EXPIRY_TIMER)
    def expiry(self, element_state=DoFn.StateParam(ELEMENT_STATE), count_state=DoFn.StateParam(COUNT_STATE)):
        batch = [element for element in element_state.read()]
        yield batch
        element_state.clear()
        count_state.clear()

  return _GroupIntoBatchesDoFn()

Проблема в середине выполнения этого кода, я получаю ошибку

KeyError: '\ x1a; \ n-GroupIntoBatches / ParDo (_GroupIntoBatchesDoFn) \ x12 \ x05count "\ x03key'

После некоторой отладки я понял, что это из-за функции expiry последнейстрока count_state.clear(). В этот момент бегун удаляет объект состояния.

В настоящее время я не очень хорошо понимаю BEAM, поэтому я ищу кого-то, кто мог бы направить меня в правильном направлении.если я что-то здесь упускаю или делаю что-то не так.

Вот аналог JAVA [https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java 1

...