Я новичок в Искра Потоковое программирование, пожалуйста, кто-нибудь объясните мне, в чем проблема
Я имею в виду, что я повторяю нулевую структуру, но у меня есть класс производителя, который работает нормально
мой исходный код:
public class Main3 implements java.io.Serializable {
public static JavaDStream<Double> pr;
public void consumer() throws Exception{
// Configure Spark to connect to Kafka running on local machine
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
Collection<String> topics = Arrays.asList("testing");
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka10WordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));
final JavaInputDStream<ConsumerRecord<String, String>> receiver=
KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));
JavaDStream<String> stream = receiver.map(new Function<ConsumerRecord<String,String>, String>() {
@Override
public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
return kafkaRecord.value();
}
});
stream.foreachRDD( x->x.saveAsTextFile("/home/khouloud/Desktop/exemple/b")); //that does no do any thing
stream.foreachRDD( x-> {
x.collect().stream().forEach(n-> System.out.println("item of list: "+n));
}); // also this i see any thing in the console
stream.foreachRDD( rdd -> {
if (rdd.isEmpty()) System.out.println("its empty"); }); //nothing`
JavaPairDStream<Integer, List<Double>> points= stream.mapToPair(new PairFunction<String, Integer, List<Double>>(){
@Override
public Tuple2<Integer, List<Double>> call(String x) throws Exception {
String[] item = x.split(" ");
List<Double> l = new ArrayList<Double>();
for (int i= 1 ; i < item.length ; i++)
{
l.add(new Double(item[i]));
}
return new Tuple2<>(new Integer(item[0]), l);
}}
);`
Ошибка -
`org.apache.spark.SparkException: задача не сериализуется в
org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 340)
в
org.apache.spark.util.ClosureCleaner $ .org $ апач $ искра $ Util $ ClosureCleaner $$ чистый (ClosureCleaner.scala: 330)
в
org.apache.spark.util.ClosureCleaner $ .Почистить (ClosureCleaner.scala: 156)
в org.apache.spark.SparkContext.clean (SparkContext.scala: 2294) в
org.apache.spark.streaming.dstream.DStream $$ anonfun $ Карта $ 1.Apply (DStream.scala: 547)
в
org.apache.spark.streaming.dstream.DStream $$ anonfun $ Карта $ 1.Apply (DStream.scala: 547)
в
org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151)
в
org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 112)
в org.apache.spark.SparkContext.withScope (SparkContext.scala: 701)
в
org.apache.spark.streaming.StreamingContext.withScope (StreamingContext.scala: 265)
в org.apache.spark.streaming.dstream.DStream.map (DStream.scala: 546)
в
org.apache.spark.streaming.api.java.JavaDStreamLike $ class.mapToPair (JavaDStreamLike.scala: 163)
в
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapToPair (JavaDStreamLike.scala: 42)
в Min.calculDegSim (Min.java:43) в SkyRule.execute (SkyRule.java:34)
на Main3.consumer (Main3.java:159) на
Исполнитель $ 2.run (Executer.java:27) в
java.lang.Thread.run (Thread.java:748) Причина:
java.io.NotSerializableException: график неожиданно обнуляется, когда
DStream сериализуется. Стек сериализации:
в
org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 40)
в
org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 46)
в
org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 100)
в
org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 337)