Проблемы с обработкой с сохранением состояния в Apache Beam - PullRequest
0 голосов
/ 26 апреля 2018

Итак, я прочитал Stateful-обработку луча и своевременную обработку статей и обнаружил проблемы с реализацией функций как таковых.

Проблема, которую я пытаюсь решить, похожа на this для генерации последовательного индекса для каждой строки.Так как я хочу иметь возможность ссылаться на строку, созданную потоком данных, на исходный источник.

public static class createIndex extends DoFn<String, KV<String, String>> {
  @StateId("count")
  private final StateSpec<ValueState<Long>> countState = StateSpecs.value(VarLongCoder.of());

  @ProcessElement
  public void processElement(ProcessContext c, @StateId("count") ValueState<Long> countState)  {

    String val = c.element();
    long count = 0L;
    if(countState.read() != null)
      count = countState.read();

    count = count + 1;
    countState.write(count);

    c.output(KV.of(String.valueOf(count), val));

  }
}

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
 .apply(ParDo.of(new createIndex()));

Я следил за всем, что мог найти в Интернете, просматривал необработанный исходный код ParDo и не был уверен, что нужно сделать.Ошибка, которую я получаю:

java.lang.IllegalArgumentException: ParDo requires its input to use KvCoder in order to use state and timers.

Я посмотрел примеры здесь и здесь .

Я понимаю, что это простая проблема, но из-за отсутствия достаточного количества примеров или документации я не смог ее решить.Буду признателен за любую помощь.Спасибо!

1 Ответ

0 голосов
/ 26 апреля 2018

Хорошо, я продолжил работать над проблемой и, прочитав какой-то источник, смог решить ее.Оказывается, что для ввода ParDo.of(new DoFn()) требуется, чтобы вход имел форму KV<T,U>.

Поэтому, чтобы прочитать файл и создать индекс для каждой строки, мне нужно пропустить его через объект Key Value Pair.Ниже я добавил код:

public static class FakeKvPair extends DoFn<String, KV<String, String>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(KV.of("", c.element()));
  }
}

И изменил конвейер на:

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
 .apply(ParDo.of(new FakeKvPair()))
 .apply(ParDo.of(new createIndex()));

Новая проблема, которая возникает, заключается в том, сохраняется ли порядок строк, так как я выполняюдополнительная функция ParDo (которая может потенциально изменить порядок строк) перед передачей на createIndex().

На моем локальном компьютере порядок сохранен, но я не уверен, как это масштабируется до Dataflow.Но я задам это другим вопросом.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...