Apache DoFn Stateful Beam периодически выводит все пары K / V - PullRequest
1 голос
/ 01 мая 2020

Я пытаюсь объединить (на ключ) источник потоковых данных в Apache Beam (через Scio), используя DoFn с отслеживанием состояния (используя @ProcessElement с @StateId ValueState элементами). Я думал, что это будет наиболее подходящим для проблемы, которую я пытаюсь решить. Требования:

  • для данного ключа, записи агрегированы (по существу, суммированы) за все время - Мне не нужны ранее вычисленные агрегаты, только самые последние
  • ключей могут быть исключены из состояния (state.clear()) на основании определенных условий, которыми я управляю
  • Каждые 5 минут, независимо от того, были ли обнаружены какие-либо новые ключи , все ключи , которые не были исключены из состояния , должны быть выведены

Учитывая, что это потоковый конвейер и будет работать бесконечно, используя combinePerKey По всему глобальному окну с накапливающимися запущенными панелями кажется, что он продолжит увеличивать объем памяти и объем данных, которые ему нужны для работы с течением времени, поэтому я бы хотел этого избежать. Кроме того, при тестировании этого (возможно, как и ожидалось) он просто добавляет вновь вычисленные агрегаты к выводу вместе с историческим вводом, а не использует последнее значение для каждого ключа.

Я думал, что при использовании StatefulDoFn просто позволил бы мне выводить все глобальное состояние до сих пор (), но, похоже, это не тривиальное решение. Я видел намеки на использование таймеров для искусственного выполнения обратных вызовов для этого, а также на возможном использовании медленно растущей боковой входной карты ( Как решить исключение Duplicate values ​​при создании PCollectionView > ) и как-то сбрасывать это, но это, по сути, потребует итерации по всем значениям на карте, а не присоединения к ней.

Мне кажется, что я могу пропустить что-то простое, чтобы заставить это работать. Я относительно новичок во многих концепциях управления окнами и таймерами в Beam, ищу любой совет о том, как решить эту проблему. Спасибо!

* ОБНОВЛЕНИЕ *

У меня есть закодированная концепция, но я замечаю, что даже при сбросе вывода с помощью команды do fn с отслеживанием состояния (я вижу вывод в журналах и состояния отладки в пользовательском интерфейсе потока данных) данные не выводятся ...:

Stateful Do Fn

// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]

class StatefulDoFn extends DoFnT {

  @StateId("key")
  private val keySpec = StateSpecs.value[String]()

  @StateId("domainState")
  private val domainStateSpec = StateSpecs.value[DomainState]()

  @TimerId("loopingTimer")
  private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)

  @ProcessElement
  def process(
      context: DoFnT#ProcessContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = {

    ... logic to create key/value

    if (keepState(value)) {
      loopingTimer.offset(Duration.standardSeconds(150)).setRelative()

      stateKey.write(key)
      stateValue.write(value)

      if (flushState(value)) {
        context.output(KV.of(key, value))
      }
    } else {
      stateValue.clear()
    }
  }

  @OnTimer("loopingTimer")
  def onLoopingTimer(
      context: DoFnT#OnTimerContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = {

    ... logic to create key/value checking for nulls

    if (keepState(value)) {

      loopingTimer.offset(Duration.standardSeconds(150)).setRelative()

      if (flushState(value)) {
        context.output(KV.of(key, value))
      }
    }
  }
}

С конвейером

sc
  .pubsubSubscription(...)
  .keyBy(...)
  .withGlobalWindow(
    WindowOptions(
      trigger = AfterPane.elementCountAtLeast(1)))
  .applyPerKeyDoFn(new StatefulDoFn())
  .withFixedWindows(
    duration = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      trigger = AfterWatermark.pastEndOfWindow(),
      allowedLateness = Duration.ZERO,
      // Only take the latest per key during a window
      timestampCombiner = TimestampCombiner.LATEST
    ))
  .saveAsCustomOutput(TextIO.write()...)

1 Ответ

1 голос
/ 07 мая 2020

Вы правы, что Stateful DoFn поможет вам здесь. Это базовый c набросок того, что вы можете сделать. Обратите внимание, что это только выводит сумму без ключа. Это может быть не совсем то, что вы хотите, но это должно помочь вам двигаться вперед.

class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {

  @TimerId("emitter")
  private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @StateId("done")
  private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();

  @StateId("agg")
  private final StateSpec<CombiningState<Integer, int[], Integer>>
      aggSpec = StateSpecs.combining(
          Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());

  @ProcessElement
  public void processElement(ProcessContext c,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) throws Exception {
        if (SOME CONDITION) {
          countValueState.clear();
          doneState.write(true);
        } else {
          countValueState.addAccum(c.element().getValue());
          emitterTimer.align(Duration.standardMinutes(5)).setRelative();
        }
      }
    }

  @OnTimer("emitter")
  public void onEmit(
      OnTimerContext context,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) {
      Boolean isDone = doneState.read();
      if (isDone != null && isDone) {
        return;
      } else {
        context.output(aggState.getAccum());
        // Set the timer to emit again
        emitterTimer.align(Duration.standardMinutes(5)).setRelative();
      }
    }
  }
  }

Рад повторить с вами кое-что, что сработает.

...