Непоследовательная выборка потребителем искровой кафки - PullRequest
0 голосов
/ 26 мая 2018

Я написал код для извлечения записей из кафки в спарк.Я столкнулся с каким-то странным поведением.Это потребляет в непоследовательном порядке.

 val conf = new SparkConf()
  .setAppName("Test Data")
  .set("spark.cassandra.connection.host", "192.168.0.40")
  .set("spark.cassandra.connection.keep_alive_ms", "20000")
  .set("spark.executor.memory", "1g")
  .set("spark.driver.memory", "2g")
  .set("spark.submit.deployMode", "cluster")
  .set("spark.executor.instances", "4")
  .set("spark.executor.cores", "3")
  .set("spark.cores.max", "12")
  .set("spark.driver.cores", "4")
  .set("spark.ui.port", "4040")
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.streaming.kafka.maxRatePerPartition", "30")
  .set("spark.local.dir", "//tmp//")
  .set("spark.sql.warehouse.dir", "/tmp/hive/")
  .set("hive.exec.scratchdir", "/tmp/hive2")

val spark = SparkSession
  .builder
  .appName("Test Data")
  .config(conf)
  .getOrCreate()

import spark.implicits._

val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val topics = Map("topictest" -> 1)
val kafkaParams = Map[String, String](
  "zookeeper.connect" -> "192.168.0.40:2181",
  "group.id" -> "=groups",
  "auto.offset.reset" -> "smallest")

val kafkaStream =  KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER)
}

kafkaStream.foreachRDD(rdd =>
  {
    if (!rdd.partitions.isEmpty) {
      try {
        println("Count of rows " + rdd.count())
      } catch {
        case e: Exception => e.printStackTrace
      }
    } else {
      println("blank rdd")
    }
  }) 

Итак, изначально я произвел 10 миллионов записей в Кафке.Теперь производитель остановлен, а затем запустил Spark Consumer Application.Я проверил пользовательский интерфейс Spark, первоначально я получал 700 000-900 000 записей на пакет (каждые 10 секунд) на поток, затем начал получать записи 4-6K на пакет.Поэтому я хотел понять, почему количество выборок так сильно сократилось, несмотря на тот факт, что данные в Kafka присутствуют, поэтому вместо того, чтобы давать 4 КБ для каждой партии, я открыт для покупателя большой партии.Что можно сделать и как?

Спасибо,

...