Несоответствие в работе Apache Beam Data Generator с DirectRunner и FlinkRunner - PullRequest
0 голосов
/ 13 мая 2019

Этот вопрос связан с моей более ранней публикацией о сравнительном тестировании Apache Beam с генератором данных на лету.

У меня есть следующий код для генерации данных в моем конвейере:

PCollection<Long> data = pipeline.apply(GenerateSequence.from(1)
                         .withMaxReadTime(Duration.millis(3000)));

//Print generated data
data.apply(ParDo.of(new DoFn<Long, String>() {
  @ProcessElement
  public void processElement(@Element Long input) {
    System.out.println(input);
  }
}));

pipeline.run();

Если я запускаю этот код с DirectRunner (--runner = direct), я не вижу сгенерированные значения на моей консоли.

Если я запускаю тот же код с FlinkRunner (--runner = FlinkRunner), я могу увидеть сгенерированные данные в выводе консоли, как показано ниже

4106
4109
4083
.
.
.

Другая проблема заключается в том, что, хотя я объявляю максимальное время чтения 3 секунды, генератор никогда не останавливается!

Если я просто опускаю максимальное время чтения из кода и запускаю только следующее:

PCollection<Long> data = pipeline.apply(GenerateSequence.from(1));

//Print generated data
data.apply(ParDo.of(new DoFn<Long, String>() {
  @ProcessElement
  public void processElement(@Element Long input) {
    System.out.println(input);
  }
}));

pipeline.run();

DirectRunner и FlinkRunner могут без проблем выводить сгенерированные значения на консоль.

У кого-нибудь есть идея, почему я мог столкнуться с этой проблемой?

...