Beam / Dataflow: понимание, почему пакетное задание останавливается перед шагом Stateful Pardo - PullRequest
0 голосов
/ 15 апреля 2020

У меня есть конвейер, который по существу читает сжатые данные JSON, выполняет небольшую обработку и записывает их через массовый запрос API. Он выполняет sh массовый запрос API, у меня есть DoFn, который буферизует определенный объем данных (скажем, 1500 записей) и сбрасывает пакет, когда он достигает этого порога ( в значительной степени точно так же, как это происходит в блоге Beam). post )

Способ выполнения конвейера в потоке данных заключается в том, что он читает ВСЕ данные JSON (почти на 1 ТБ) перед переходом к этапу пакетной обработки. Есть идеи, что здесь происходит? Промежуточный шаг «Добавить случайный ключ», по существу, принимает строку JSON и выводит KV (random.nextInt(), jsonString), которое было добавлено для предотвращения слияния в секцию вызовов пакета + API

enter image description here

Редактировать: добавление фрагмента кода для раздела «Буфер»:

new DoFn<KV<String, String>, KV<String, String>>() {

  private static final int MAX_BUFFER_SIZE = 500;
  private static final Duration MAX_BUFFER_DURATION = Duration.standardSeconds(1);

  @TimerId("stale")
  private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<String>> bufferedEvents = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<KV<String, String>> bufferState,
      @TimerId("stale") Timer staleTimer,
      @StateId("count") ValueState<Integer> countState) {
    if (firstNonNull(countState.read(), 0) == 0) {
      staleTimer.offset(MAX_BUFFER_DURATION).setRelative();
    }
    int count = firstNonNull(countState.read(), 0);
    count = count + 1;
    countState.write(count);
    bufferState.add(context.element().value());

    if (count >= MAX_BUFFER_SIZE) {
      for (KV<String, String> enrichedEvent : enrichEvents(bufferState.read())) {
        context.output(KV.of(RandomStringUtils.randomNumeric(1), enrichedEvent));
      }
      bufferState.clear();
      countState.clear();
    }
  }



  @OnTimer("stale")
  public void onStale(
      OnTimerContext context,
      @StateId("buffer") BagState<Event> bufferState,
      @StateId("count") ValueState<Integer> countState) {
    if (!bufferState.isEmpty().read()) {
      for (String enrichedEvent : enrichEvents(bufferState.read())) {
        context.output(KV.of(RandomStringUtils.randomNumeric(1), enrichedEvent));
      }
      bufferState.clear();
      countState.clear();
    }
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...