Сбой непрерывного потока Beam SparkRunner из-за отсутствия зарегистрированных операций вывода - PullRequest
0 голосов
/ 24 октября 2019

Я пытаюсь создать потокового потребителя Spache луча Apache (чтение из кинесиса), который должен непрерывно прослушивать поток. Я создал приложение, основанное на примере WordCount, и изменил его для чтения из потока кинезиса. На этом этапе мы ограничены использованием SparkRunner. Обычно это работает, как и ожидалось, при чтении из горизонта потока (вывод производится и т. Д.). Но при запуске конвейера с помощью spark-submit в потоковом режиме, если в потоке больше нет доступных данных, исключение java.lang.IllegalArgumentException: требование не выполнено: нет зарегистрированных операций вывода, поэтому ничего для выполнения не выбрасывается и не завершается задолго до завершения работы,Рассматривая это далее, я заметил, что DStreamingGraph ожидает, что outputStream будет зарегистрирован. Запущенное родное искровое потоковое приложение не ведет себя так. Я ожидаю, что потоковое приложение будет работать постоянно.

Я что-то пропустил? Помощь будет принята с благодарностью. Версия Beam 2.16. Версия Spark 2.4.0

Ошибка: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute at scala.Predef$.require(Predef.scala:224) at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:168) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at org.apache.beam.runners.spark.SparkRunner.lambda$run$0(SparkRunner.java:206) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

КОД:

static void readWordCountFromStream(RunningOptions options) {
    Pipeline p = Pipeline.create(options);
    Properties properties = new Properties();

    KinesisIO.Read kinesisReader = KinesisCustomConfigurator.initReader(options, properties);

    TextIO.Write textWriter = TextIO.write()
            .withWindowedWrites()
            .withNumShards(1)
            .to(options.getOutputPath());

    PCollection<String> lines = p.apply("ReadLines", kinesisReader)
            .apply(ParDo.of(new KinesisRecordToStringDetailed()))
            .apply("FixedWindows",Window.<String>into(FixedWindows.of(Utilities.resolveDuration(options.getWindowDuration()))));

    lines.apply("CountWords", new CountWords())
            .apply(MapElements.via(new FormatAsTextFn()))
            .apply("WriteCounts", textWriter);

    PipelineResult result = p.run();
    result.waitUntilFinish();

}

...