Я пытаюсь проверить работу ручной фиксации смещения.
Когда я пытаюсь выйти из задания либо с помощью thread.sleep () / jssc.stop () /, генерируя исключения в цикле while,Я вижу, что смещения совершаются.
Я просто отправляю пару сообщений для тестирования, но я вижу 0 лагов, как только задание начинает обрабатывать пакет.
Когда Spark фактически фиксирует смещения?
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
kafkaStream.foreachRDD(kafkaStreamRDD -> {
// fetch kafka offsets for manually committing it later
OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();
// filter unwanted data
kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
//filter logic here
}).foreachPartition(kafkaRecords -> {
//Initializing DB connections
while (kafkaRecords.hasNext()) {
//doing some work here
//-----> EXCEPTION
throw new Exception();
}
});
// commit offsets saveOffsets after processing
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
if (exception != null) {
System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
exception.printStackTrace(); // need this for driver
} else {
System.out.println("Successfully committed offsets"); // need this for driver
for (OffsetRange offsetRange : offsetRanges) {
System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
}
}
});
enable.auto.commit
: false
Соблюдать throw new Exception();
в цикле while. Даже если пакет завершится неудачно из-за исключения, я вижу зафиксированное смещение, я ожидаю некоторой задержки здесь, поскольку обработка не удалась, что здесь не так?