Я читаю потоковые сообщения, используя KafkaUtils.createDirectStream и использую commitAsync для фиксации смещения после их обработки (т.е. после их загрузки в таблицу Hive).Я вижу, что между загрузкой улья и фиксацией смещения существует определенный временной интервал, примерно от половины минуты до одной минуты.Другое дело, что метод фиксации запускается с определенной частотой.Как упомянуто выше, он срабатывает каждые полминуты и не зависит от времени / частоты поступления данных.
Таким образом, в этом случае, если мое приложение будет убито за время досмещение сохраняется, это происходит, чтобы прочитать этот конкретный пакет данных снова.Могу ли я контролировать время / частоту, когда смещение фиксируется (т. Е. Срабатывает commitAsyn), чтобы уменьшить задержку?Или вы думаете, что commitSync поможет в этом случае?И если да, могу ли я использовать commitSync с KafkaUtils.createDirectStream для чтения сообщений?Можете ли вы привести пример, как реализовать API commitSync с KafkaUtils.createDirectStream?
public void method1(SparkConf conf,String app)
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> recs= KafkaUtils.createDirectStream(javaStreamContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
recs.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
JavaRDD<String> javardd = rdd.map(tuple2 -> tuple2.value() );
if(!javardd.isEmpty()) {
JavaRDD<String> cache = javardd.cache();
<MethodToLoadIntoHive>
((CanCommitOffsets)recs.inputDStream()).commitAsync(offsetRanges);
}
});
javaStreamContext.start();
javaStreamContext.awaitTermination();
}