Этот вопрос связан с моей более ранней публикацией о сравнительном тестировании Apache Beam с генератором данных на лету.
У меня есть следующий код для генерации данных в моем конвейере:
PCollection<Long> data = pipeline.apply(GenerateSequence.from(1)
.withMaxReadTime(Duration.millis(3000)));
//Print generated data
data.apply(ParDo.of(new DoFn<Long, String>() {
@ProcessElement
public void processElement(@Element Long input) {
System.out.println(input);
}
}));
pipeline.run();
Если я запускаю этот код с DirectRunner (--runner = direct), я не вижу сгенерированные значения на моей консоли.
Если я запускаю тот же код с FlinkRunner (--runner = FlinkRunner), я могу увидеть сгенерированные данные в выводе консоли, как показано ниже
4106
4109
4083
.
.
.
Другая проблема заключается в том, что, хотя я объявляю максимальное время чтения 3 секунды, генератор никогда не останавливается!
Если я просто опускаю максимальное время чтения из кода и запускаю только следующее:
PCollection<Long> data = pipeline.apply(GenerateSequence.from(1));
//Print generated data
data.apply(ParDo.of(new DoFn<Long, String>() {
@ProcessElement
public void processElement(@Element Long input) {
System.out.println(input);
}
}));
pipeline.run();
DirectRunner и FlinkRunner могут без проблем выводить сгенерированные значения на консоль.
У кого-нибудь есть идея, почему я мог столкнуться с этой проблемой?