У меня есть приложение Spark Streaming для чтения записей с Kafka.Я ожидаю, что он будет работать по следующему алгоритму:
- получить некоторое конкретное смещение из внешней базы данных
- читать Кафку, начиная со смещения, полученного на предыдущем шаге
- если Кафка выдает исключение OffsetOutOfRangeException, начните читать Кафку с наименьшего из доступных смещений
Алгоритм реализован с помощью обработки исключений (я не знаю, есть ли лучшие способы сделать это, но я так и сделал):
val kafkaParams = Map(
"metadata.broker.list" -> (currentHost + ":" + kafkaBrokerPort.toString),
"auto.offset.reset" -> "smallest",
"enable.auto.commit" -> "true",
"group.id" -> "test"
)
try {
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, fromOffsets, messageHandler)
processKafkaStream(lines)
}
catch {
case _: kafka.common.OffsetOutOfRangeException =>
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
processKafkaStream(lines)
}
(в приведенном выше коде fromOffsets имеет тип Map [TopicAndPartition, Long]; Long - это значение смещения, полученного из внешней базы данных).
ПроблемаЯ сталкиваюсь со следующим: если я предоставляю начальное смещение слишком маленьким, и Кафка выдает исключение, приложение обрабатывает его, но вместо того, чтобы читать тему с самого начала и сохранять весь ее контент в качестве первого пакета в HDFS, оно толькосоздает каталог с пустыми файлами в HDFS для первого пакета, а затем продолжает добавлять новые данные в качестве дальнейшегоэр партии
Тема в Kafka содержит довольно мало данных, поэтому я определенно ожидал бы, что первая партия будет большим файлом.Вместо этого я получаю файлы _SUCCESS и part-00000 как пустые.
Если я пытаюсь реализовать приложение без обработки исключений и того же kafkaParams, я получаю все данные из темы в первом пакете, но янужен тот алгоритм, который позволяет мне читать Kafka с определенного смещения.
Итак, вопрос: почему я получаю пустые файлы в качестве первого пакета вместо всего содержимого темы?Есть ли способ заставить его работать в соответствии с алгоритмом выше?
Среда:
Scala: 2.11.8
Spark: 2.3.1
Кафка: 1.0.1