Spark Streaming создает пустой каталог во время чтения из Kafka - PullRequest
0 голосов
/ 04 марта 2019

У меня есть приложение Spark Streaming для чтения записей с Kafka.Я ожидаю, что он будет работать по следующему алгоритму:

  • получить некоторое конкретное смещение из внешней базы данных
  • читать Кафку, начиная со смещения, полученного на предыдущем шаге
  • если Кафка выдает исключение OffsetOutOfRangeException, начните читать Кафку с наименьшего из доступных смещений

Алгоритм реализован с помощью обработки исключений (я не знаю, есть ли лучшие способы сделать это, но я так и сделал):

    val kafkaParams = Map(
  "metadata.broker.list" -> (currentHost + ":" + kafkaBrokerPort.toString),
  "auto.offset.reset" -> "smallest",
  "enable.auto.commit" -> "true",
  "group.id" -> "test"
)

    try {
  val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
    ssc, kafkaParams, fromOffsets, messageHandler)
  processKafkaStream(lines)
}
catch {
  case _: kafka.common.OffsetOutOfRangeException =>
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)
    processKafkaStream(lines)
}

(в приведенном выше коде fromOffsets имеет тип Map [TopicAndPartition, Long]; Long - это значение смещения, полученного из внешней базы данных).

ПроблемаЯ сталкиваюсь со следующим: если я предоставляю начальное смещение слишком маленьким, и Кафка выдает исключение, приложение обрабатывает его, но вместо того, чтобы читать тему с самого начала и сохранять весь ее контент в качестве первого пакета в HDFS, оно толькосоздает каталог с пустыми файлами в HDFS для первого пакета, а затем продолжает добавлять новые данные в качестве дальнейшегоэр партии

Тема в Kafka содержит довольно мало данных, поэтому я определенно ожидал бы, что первая партия будет большим файлом.Вместо этого я получаю файлы _SUCCESS и part-00000 как пустые.

Если я пытаюсь реализовать приложение без обработки исключений и того же kafkaParams, я получаю все данные из темы в первом пакете, но янужен тот алгоритм, который позволяет мне читать Kafka с определенного смещения.

Итак, вопрос: почему я получаю пустые файлы в качестве первого пакета вместо всего содержимого темы?Есть ли способ заставить его работать в соответствии с алгоритмом выше?

Среда:

Scala: 2.11.8

Spark: 2.3.1

Кафка: 1.0.1

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...