Приведенный ниже код работает и фиксирует смещение после обработки данных.Но проблема в том, что он обрабатывает дубликаты в следующем случае:
Задание потребителя выполняется, и таблица кустов содержит 0 записей, а текущее смещение равно (FORMAT- fromOffest, tillOffset, Difference): 512 512 0
Затем я произвел 1000 записей, и к тому времени, когда он прочитал 34 записи, но не зафиксировал, я уничтожил ее 512 546 34
Я вижу, что к этому времени 34 записи уже были загруженык таблице Hive
Далее я перезапустил приложение.
Я вижу, что оно снова читает 34 записи (вместо чтения 1000-34 = 76 записей), хотя оно уже обработало их изагружен в Hive 512 1512 1000, а затем через несколько секунд он обновляется.1512 1512 0 Улей теперь имеет (34 + 1000 = 1034)
Это приводит к дублированию записей (дополнительно 34) в таблице.Как упоминалось в коде, я фиксирую смещение только после обработки / загрузки в таблицу Hive.
public void method1(SparkConf conf,String app)
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> tuple2) throws Exception {
return tuple2.value();
}
});
records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
if(!rdd.isEmpty()) {
methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
}
}
});
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
for (OffsetRange offset : offsetRanges) {
System.out.println(offset.fromOffset() + " " + offset.untilOffset()+ " "+offset.count());
}
}
});
javaStreamContext.start();
javaStreamContext.awaitTermination();
}