Я пытаюсь запустить модифицированный пример WindowedWordCount(NewWordCount)
с использованием луча на локальном кластере flink
.Теперь я хочу, чтобы он непрерывно считывал данные из указанного локального каталога, делал wordcount
на основе окна и затем выводил по одному файлу для каждого окна.Окно основано на времени (1 мин).А для вывода триггер может быть основан на времени или записи.Я использую Flink 1.7 Beam 2.11
NewWordCount - это код, который я пробовал.Я использую команду:
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.NewWordCount -D exec.args="--runner=FlinkRunner --flinkMaster=localhost --filesToStage=.\target\word-count-beam-bundled-0.1.jar --inputFile='<dir path>\input\*' --output='<dir path>\output_streaming\count' --streaming=true --parallelism=1" -P flink-runner
Это прекрасно работает для файлов в каталоге.Он читает все файлы, вычисляет выходные данные и создает два выходных файла с количеством слов.Конвейер продолжает работать, но он не читает новый файл, помещенный в каталог (как я понимаю, потоковая передача должна).Ни выход не является непрерывным, ни основанным на окне, даже в случае огромного ввода
public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory<Long> {
@Override
public Long create(PipelineOptions options) {
return options.as(Options.class).getMinTimestampMillis()
+ Duration.standardHours(1).getMillis();
}
}
...
static void runWindowedWordCount(Options options) throws IOException {
final String output = options.getOutput();
final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
Pipeline pipeline = Pipeline.create(options);
Pipeline p = Pipeline.create(options);
pipeline
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)))
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2))))
.withAllowedLateness(Duration.standardMinutes(1))
.accumulatingFiredPanes())
.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
.apply(new WriteOneFilePerWindow(output, options.getNumShards()));
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
runWindowedWordCount(options);
}
}