У меня есть приложение Spark Streaming, которое читает из раздела Kafka. Я управляю смещениями вручную для восстановления.когда я рушу драйвер, он автоматически перезапускается, и последнее смещение, записанное в теме Кафки, выглядит хорошо.но я подозреваю, что он записывает смещение до того, как исполнитель завершит работу, поэтому я потерял последний кусок данных перед тем, как завершить его.
kafkaSparkInputDStream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
rdd.for (OffsetRange offsetRange : offsetRanges) {
if (offsetRange.count() >0) {
System.out.println("offsetRange. from [" + offsetRange.fromOffset() +"] until[" + offsetRange.untilOffset()+ "]. count [" + offsetRange.count()+ "] partition [" +offsetRange.partition()+"]" );
}
}
((CanCommitOffsets) kafkaSparkInputDStream.inputDStream()).commitAsync(offsetRanges);