Мое приложение Kafka считывает потоковые данные в реальном времени, обрабатывает их и сохраняет в Hive. Я пытаюсь зафиксировать смещение с помощью commitAsync
.Я получаю исключение ниже:
Причина: java.io.NotSerializableException: Объект org.apache.spark.streaming.kafka010.DirectKafkaInputDStream сериализуется, возможно, как часть закрытия операции RDD,Это связано с тем, что на объект DStream ссылаются из замыкания.Пожалуйста, перепишите операцию RDD внутри этого DStream, чтобы избежать этого.Это было применено, чтобы избежать раздувания задач Spark с ненужными объектами.
Ниже приведен рабочий процесс моего кода:
public void method1(SparkConf conf,String app)
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> tuple2) throws Exception {
return tuple2.value();
}
});
records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
if(!rdd.isEmpty()) {
methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
}
}
});
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
}
});
javaStreamContext.start();
javaStreamContext.awaitTermination();
}
Оцените любые предложения.
Приведенный ниже код работает и фиксирует смещение после обработки данных.Но проблема в том, что он обрабатывает дубликаты в следующем случае: допустим, задание потребителя выполняется, а таблица кустов содержит 0 записей, а текущее смещение равно (FORMAT- fromOffest, tillOffset, Difference): 512 512 0 Затем я произвел1000 записей, и к тому времени, когда он прочитал 34 записи, но не зафиксировал, я убил его 512 546 34
Я вижу, что к этому времени 34 записи уже были загружены в таблицу Hive
Далее я перезапустил приложение.Я вижу, что он снова читает 34 записи (вместо чтения 1000-34 = 76 записей), хотя он уже обработал их и загрузил в Hive 512 1512 1000 И затем через несколько секунд он обновляется.1512 1512 0 Улей теперь имеет (34 + 1000 = 1034)
Это приводит к дублированию записей (дополнительно 34) в таблице.Как указано в коде, я фиксирую смещение только после обработки / загрузки в таблицу Hive.
Пожалуйста, предложите.
public void method1(SparkConf conf,String app)
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> tuple2) throws Exception {
return tuple2.value();
}
});
records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
if(!rdd.isEmpty()) {
methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
}
}
});
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
for (OffsetRange offset : offsetRanges) {
System.out.println(offset.fromOffset() + " " + offset.untilOffset()+ " "+offset.count());
}
}
});
javaStreamContext.start();
javaStreamContext.awaitTermination();
}