Я пытаюсь создать потокового потребителя Spache луча Apache (чтение из кинесиса), который должен непрерывно прослушивать поток. Я создал приложение, основанное на примере WordCount, и изменил его для чтения из потока кинезиса. На этом этапе мы ограничены использованием SparkRunner. Обычно это работает, как и ожидалось, при чтении из горизонта потока (вывод производится и т. Д.). Но при запуске конвейера с помощью spark-submit в потоковом режиме, если в потоке больше нет доступных данных, исключение java.lang.IllegalArgumentException: требование не выполнено: нет зарегистрированных операций вывода, поэтому ничего для выполнения не выбрасывается и не завершается задолго до завершения работы,Рассматривая это далее, я заметил, что DStreamingGraph ожидает, что outputStream будет зарегистрирован. Запущенное родное искровое потоковое приложение не ведет себя так. Я ожидаю, что потоковое приложение будет работать постоянно.
Я что-то пропустил? Помощь будет принята с благодарностью. Версия Beam 2.16. Версия Spark 2.4.0
Ошибка:
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:168)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
at org.apache.beam.runners.spark.SparkRunner.lambda$run$0(SparkRunner.java:206)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
КОД:
static void readWordCountFromStream(RunningOptions options) {
Pipeline p = Pipeline.create(options);
Properties properties = new Properties();
KinesisIO.Read kinesisReader = KinesisCustomConfigurator.initReader(options, properties);
TextIO.Write textWriter = TextIO.write()
.withWindowedWrites()
.withNumShards(1)
.to(options.getOutputPath());
PCollection<String> lines = p.apply("ReadLines", kinesisReader)
.apply(ParDo.of(new KinesisRecordToStringDetailed()))
.apply("FixedWindows",Window.<String>into(FixedWindows.of(Utilities.resolveDuration(options.getWindowDuration()))));
lines.apply("CountWords", new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", textWriter);
PipelineResult result = p.run();
result.waitUntilFinish();
}