Исключение Kafka при фиксации смещения с использованием commitAsync - PullRequest
0 голосов
/ 27 января 2019

Мое приложение Kafka считывает потоковые данные в реальном времени, обрабатывает их и сохраняет в Hive. Я пытаюсь зафиксировать смещение с помощью commitAsync.Я получаю исключение ниже:

Причина: java.io.NotSerializableException: Объект org.apache.spark.streaming.kafka010.DirectKafkaInputDStream сериализуется, возможно, как часть закрытия операции RDD,Это связано с тем, что на объект DStream ссылаются из замыкания.Пожалуйста, перепишите операцию RDD внутри этого DStream, чтобы избежать этого.Это было применено, чтобы избежать раздувания задач Spark с ненужными объектами.

Ниже приведен рабочий процесс моего кода:

public void method1(SparkConf conf,String app) 
    spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
    final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
                new Duration(<spark duration>));
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
    messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
        @Override
        public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> tuple2) throws Exception {
                        return tuple2.value();
                    }
                });

                records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                    @Override
                    public void call(JavaRDD<String> rdd) throws Exception {
                        if(!rdd.isEmpty()) {
                            methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                        }
                    }
                 });
                ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
        }
    });
    javaStreamContext.start();
    javaStreamContext.awaitTermination();
}

Оцените любые предложения.


Приведенный ниже код работает и фиксирует смещение после обработки данных.Но проблема в том, что он обрабатывает дубликаты в следующем случае: допустим, задание потребителя выполняется, а таблица кустов содержит 0 записей, а текущее смещение равно (FORMAT- fromOffest, tillOffset, Difference): 512 512 0 Затем я произвел1000 записей, и к тому времени, когда он прочитал 34 записи, но не зафиксировал, я убил его 512 546 34

Я вижу, что к этому времени 34 записи уже были загружены в таблицу Hive

Далее я перезапустил приложение.Я вижу, что он снова читает 34 записи (вместо чтения 1000-34 = 76 записей), хотя он уже обработал их и загрузил в Hive 512 1512 1000 И затем через несколько секунд он обновляется.1512 1512 0 Улей теперь имеет (34 + 1000 = 1034)

Это приводит к дублированию записей (дополнительно 34) в таблице.Как указано в коде, я фиксирую смещение только после обработки / загрузки в таблицу Hive.

Пожалуйста, предложите.

public void method1(SparkConf conf,String app) 
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
            new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));

            JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                @Override
                public String call(ConsumerRecord<String, String> tuple2) throws Exception {
                    return tuple2.value();
                }
            });

            records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> rdd) throws Exception {
                    if(!rdd.isEmpty()) {
                        methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                    }
                }
             });

             messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
              @Override
              public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                    ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);                     
                    for (OffsetRange offset : offsetRanges) {
                        System.out.println(offset.fromOffset() + " " + offset.untilOffset()+ "  "+offset.count());
                    }
                     }
              });             
javaStreamContext.start();
javaStreamContext.awaitTermination();

}

Ответы [ 2 ]

0 голосов
/ 30 января 2019

Код ниже работает. Но я не уверен, фиксирует ли это смещение после обработки в куст, потому что блок commitAsync находится перед вызовом метода хранилища улья.

public void method1(SparkConf conf,String app) 
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
            new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
    @Override
    public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
    }
});
            JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                @Override
                public String call(ConsumerRecord<String, String> tuple2) throws Exception {
                    return tuple2.value();
                }
            });

            records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> rdd) throws Exception {
                    if(!rdd.isEmpty()) {
                        methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                    }
                }
             });

javaStreamContext.start();
javaStreamContext.awaitTermination();

}

К этому коду, если я добавлю приведенный ниже блок (сразу после инициализации offsetRanges), чтобы напечатать сведения о смещении, он снова не будет работать, выдает такое же исключение

messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
              @Override
              public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {


                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

               rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String,String>>>() {
                   @Override
                   public void call(Iterator<org.apache.kafka.clients.consumer.ConsumerRecord<String,String>> arg0) throws Exception {

                   OffsetRange o = offsetRanges[TaskContext.get().partitionId()];

                   System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
                   }
            });

                ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);

              }
              });

Пожалуйста, оставьте свой комментарий

0 голосов
/ 29 января 2019

Попробуйте переместить ((CanCommitOffsets) messages.inputDStream ()). CommitAsync (offsetRanges); блок foreachRDD

public void method1(SparkConf conf,String app) 
    spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
    final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
                new Duration(<spark duration>));
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
    messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
        @Override
        public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> tuple2) throws Exception {
                        return tuple2.value();
                    }
                });

                records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                    @Override
                    public void call(JavaRDD<String> rdd) throws Exception {
                        if(!rdd.isEmpty()) {
                            methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                        }
                    }
                 });
        }
    });
     ((CanCommitOffsets)  messages.inputDStream()).commitAsync(offsetRanges);
    javaStreamContext.start();
    javaStreamContext.awaitTermination();
}
...