Я использую KafkaUtils.createDirectStream Kafka API для приема сообщений, а затем обрабатываю сообщения для хранения в Hive.
После перезапуска я вижу, что он всегда начинает читать из «последнего» сообщения (потому что в моем коде auto.offset.reset = latest) вместо чтения из последнего подтвержденного сообщения. Это теряет смещение в этом сценарии.
Таким образом, я хочу сохранить смещение после завершения всей обработки, чтобы я мог получить его при сбое при получении сообщения.
Подскажите, пожалуйста, как я могу сохранить смещение в Kafka, используя KafkaUtils.createDirectStream, чтобы убедиться, что все задание Kafka выполнено.