Когда я пытаюсь запустить следующее задание Spark Streaming (V 1.6), оно не потребляет никаких сообщений, но в тот момент, когда я завершаю задание, используя Ctrl + C.Он печатает сообщение из сокета и выходит из программы
`SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(Integer.parseInt(args[0])));
JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream(args[1], Integer.parseInt(args[2]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = StreamingLines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String str) throws Exception {
System.out.println("Msg recieved is"+str);
return Arrays.asList(str.split(" "));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String str) {
return new Tuple2<>(str, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});
wordCounts.print();
streamingContext.start();
streamingContext.awaitTermination();`
Команда отправки Spark:
spark-submit --name test --master yarn --deploy-mode client --num-executors 3 --conf spark.dynamicAllocation.enabled=false --class sparkTest.WordCountSocketEx /var/log/sparkTest-0.0.1-SNAPSHOT.jar 60 localhost 9999
Ниже приведен мой снимок экрана интерфейса пользователя RM
и скриншот Spark Executors