Как превратить этот простой код Kafka Stream в многопоточный? - PullRequest
0 голосов
/ 21 октября 2019

Я учу Кафку в Скале. Прикрепленный код является просто реализацией подсчета слов с использованием Kafka и Spark Streaming. Как у меня есть отдельное потребительское выполнение за раздел во время потоковой передачи? Пожалуйста, помогите!

Вот мой код:



class ConsumerM(topics: String, bootstrap_server: String, group_name: String) {

  Logger.getLogger("org").setLevel(Level.ERROR)

  val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    .setMaster("local[*]")
    .set("spark.executor.memory","1g")

  val ssc = new StreamingContext(sparkConf, Seconds(1))

  val topicsSet = topics.split(",")

  val kafkaParams = Map[String, Object](
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap_server,
    ConsumerConfig.GROUP_ID_CONFIG -> group_name,
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    "auto.offset.reset" ->"earliest")


  val messages = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))


  val lines = messages.map(_.value)
  val words = lines.flatMap(_.split(" "))
  val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  wordCounts.print()

  ssc.start()
  ssc.awaitTermination()
}

1 Ответ

0 голосов
/ 22 октября 2019

Если ваша входная тема имеет несколько разделов, то дополнительная настройка local[*] означает, что у вас будет один исполнитель Spark на ядро ​​ЦП, и каждый

может использовать как минимум один раздел.
...