Исключение: при попытке обновить смещение в zookeper (Подход 2: Прямой Подход) MapPartitionsRDD не может быть приведен к oHasOffsetRanges - PullRequest
0 голосов
/ 11 октября 2018

Я использую apache Spark 1.6.2 и Kafka 0-10

Сообщение чтения кода от Kafka (подход Direact) со смещением от Zookeper org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream (....) // Преобразование JavaInputDStream в JavaPairDStream (JavaPairDStream) streamReadFromZookeeper .mapToPair (tuple -> new Tuple2 ((tuple._1 ()), tuple._2 ())); `

        streamReadFromKaafka.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
            @Override
            public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {

                System.out.println("Started Reading message ");
                OffsetRange[] offsets = ((org.apache.spark.streaming.kafka.HasOffsetRanges) rdd.rdd()).offsetRanges();
                            offsetRanges.set(offsets);
                return rdd;
            }
        }).foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
            @Override
            public Void call(JavaPairRDD<String, String> rdd) throws IOException {


                for (OffsetRange o : offsetRanges.get()) {
                    Map<Object, Object> mm = new HashMap();
                    offsetRangesStr = o.partition() + ":" + o.fromOffset() + ",";
                    // mm.put(o, o.partition() +":" + o.fromOffset());

                    partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + o.partition();

                }
                offsetRangesStr = offsetRangesStr.substring(0, offsetRangesStr.length() - 1);

                ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, offsetRangesStr);
                return null;
            }
        });`



I am trying to update offset into Zookeepr ,When I am trying to read messages from kafka with offset from zookeper getting  below Exception 

    java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
            at com.cts.peg.iot.cg.CheckPointingZookeeper$5.call(CheckPointingZookeeper.java:170)
            at com.cts.peg.iot.cg.CheckPointingZookeeper$5.call(CheckPointingZookeeper.java:165)
            at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:380)
            at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:381)
            at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:381)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...