Я использую apache Spark 1.6.2 и Kafka 0-10
Сообщение чтения кода от Kafka (подход Direact) со смещением от Zookeper org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream (....) // Преобразование JavaInputDStream в JavaPairDStream (JavaPairDStream) streamReadFromZookeeper .mapToPair (tuple -> new Tuple2 ((tuple._1 ()), tuple._2 ())); `
streamReadFromKaafka.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
System.out.println("Started Reading message ");
OffsetRange[] offsets = ((org.apache.spark.streaming.kafka.HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}).foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws IOException {
for (OffsetRange o : offsetRanges.get()) {
Map<Object, Object> mm = new HashMap();
offsetRangesStr = o.partition() + ":" + o.fromOffset() + ",";
// mm.put(o, o.partition() +":" + o.fromOffset());
partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + o.partition();
}
offsetRangesStr = offsetRangesStr.substring(0, offsetRangesStr.length() - 1);
ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, offsetRangesStr);
return null;
}
});`
I am trying to update offset into Zookeepr ,When I am trying to read messages from kafka with offset from zookeper getting below Exception
java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
at com.cts.peg.iot.cg.CheckPointingZookeeper$5.call(CheckPointingZookeeper.java:170)
at com.cts.peg.iot.cg.CheckPointingZookeeper$5.call(CheckPointingZookeeper.java:165)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:380)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:381)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:381)