Запуск подсчета потокового слова на FlinkCluster с использованием Beam - PullRequest
0 голосов
/ 01 мая 2019

Я пытаюсь запустить модифицированный пример WindowedWordCount(NewWordCount) с использованием луча на локальном кластере flink.Теперь я хочу, чтобы он непрерывно считывал данные из указанного локального каталога, делал wordcount на основе окна и затем выводил по одному файлу для каждого окна.Окно основано на времени (1 мин).А для вывода триггер может быть основан на времени или записи.Я использую Flink 1.7 Beam 2.11

NewWordCount - это код, который я пробовал.Я использую команду:

mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.NewWordCount -D exec.args="--runner=FlinkRunner --flinkMaster=localhost --filesToStage=.\target\word-count-beam-bundled-0.1.jar --inputFile='<dir path>\input\*' --output='<dir path>\output_streaming\count' --streaming=true --parallelism=1" -P flink-runner

Это прекрасно работает для файлов в каталоге.Он читает все файлы, вычисляет выходные данные и создает два выходных файла с количеством слов.Конвейер продолжает работать, но он не читает новый файл, помещенный в каталог (как я понимаю, потоковая передача должна).Ни выход не является непрерывным, ни основанным на окне, даже в случае огромного ввода

public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory<Long> {
  @Override
  public Long create(PipelineOptions options) {
    return options.as(Options.class).getMinTimestampMillis()
      + Duration.standardHours(1).getMillis();
  }
}

...

static void runWindowedWordCount(Options options) throws IOException {
  final String output = options.getOutput();
  final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
  final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());

  Pipeline pipeline = Pipeline.create(options);
  Pipeline p = Pipeline.create(options);

    pipeline
        .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)))
        .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
                                      .triggering(AfterWatermark.pastEndOfWindow()
                                      .withEarlyFirings(
                                          AfterProcessingTime.pastFirstElementInPane()
                                              .plusDelayOf(Duration.standardMinutes(1)))
                                      .withLateFirings(
                                          AfterProcessingTime.pastFirstElementInPane()
                                              .plusDelayOf(Duration.standardMinutes(2))))
                              .withAllowedLateness(Duration.standardMinutes(1))
                              .accumulatingFiredPanes())
        .apply(new WordCount.CountWords())
        .apply(MapElements.via(new WordCount.FormatAsTextFn()))
        .apply(new WriteOneFilePerWindow(output, options.getNumShards()));

public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

  runWindowedWordCount(options);
 }
}

1 Ответ

0 голосов
/ 02 мая 2019

Я не знаю, какое входное преобразование вы используете, поскольку не могу найти входное преобразование в вашем примере кода. Если вы хотите непрерывный ввод, вы должны использовать неограниченный источник. Для TextIO, watchForNewFiles делает работу.

...