Как эффективно читать каждое сообщение в теме Кафки с помощью Spark (используя scala)? - PullRequest
0 голосов
/ 08 апреля 2019

У меня есть задача прочитать каждое сообщение в теме Кафки, которая имеет 3 раздела.У меня есть Spark и Scala в качестве инструментов для этого.

Пока у меня есть следующий код:

val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe
      [String, String](cdrTopic, kafkaParams))

kafkaStream.foreachRDD { rdd => {

      if (!rdd.isEmpty()) {

        rdd.foreachPartition(iterator => {

          while (iterator.hasNext) {

            val partition = iterator.next().value().split("\\n")

            if (!partition.isEmpty) {

              partition.foreach(string => {

                if (!string.isEmpty) {

                  //process every message here
                }

              })
            }

            println("no partition data")

          }

        })

      }
      println("no rdd data to process")
      }
    }

Каждое сообщение представляет собой строку tsv (значения, разделенные табуляцией), которую мне нужно проанализировать и сохранить в db.

Я считаю, что это очень неэффективно, потому что я использую 4 цикла, и я думаю, что некоторые данные теряются, так как количество сообщений, записываемых в БД, очень мало.

Я использую scala версии 2.11.12, spark-streaming 2.4.0, spark-streaming-kafka 2.4.0.

Существует ли эффективный способ чтения и анализа каждого сообщения в kafka с использованием spark?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...