Как использовать разные темы Kafka в каждой партии задания Spark Streaming? - PullRequest
0 голосов
/ 30 апреля 2018

Я почти уверен, что не существует простого способа сделать это, но вот мой вариант использования:

У меня есть задание Spark Streaming (версия 2.1.0) с продолжительностью 5 секунд для каждого микропакета.

Моя цель состоит в том, чтобы потреблять данные из одной темы в каждом интервале микробакта из 250 тем Кафки. Вы можете взять приведенный ниже код в качестве простого примера:

 val groupId:String = "first_group"
 val kafka_servers:String =  "datanode1:9092,datanode2:9092,datanode3:9092"

 val ss:SparkSession = SparkSession.builder().config("spark.streaming.unpersist","true").appName("ConsumerStream_test").getOrCreate()
 val ssc:StreamingContext= new StreamingContext(ss.sparkContext,Duration(5000))

val kafka_parameters:Map[String,Object]=Map(
"bootstrap.servers"       -> kafka_servers,
"key.deserializer"        -> classOf[StringDeserializer],
"value.deserializer"      -> classOf[ByteArrayDeserializer],
"heartbeat.interval.ms"   -> (1000:Integer),
"max.poll.interval.ms"    -> (100:Integer),
"enable.auto.commit"      -> (false: java.lang.Boolean),
"autoOffsetReset"         -> OffsetResetStrategy.EARLIEST,
//"connections.max.idle.ms" -> (5000:Integer),
"group.id"                -> groupId
)

val r = scala.util.Random
val kafka_list_one_topic=List("topic_"+ r.nextInt(250))

val consumer:DStream[ConsumerRecord[String,Array[Byte]]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferBrokers, ConsumerStrategies.
Subscribe[String, Array[Byte]](kafka_list_one_topic , kafka_parameters))

consumer.foreachRDD( eachRDD => {
     // DOING SOMETHING WITH THE DATA...
  })
ssc.start()
ssc.awaitTermination()

Но проблема этого подхода заключается в том, что Spark будет запускать исходный код (все перед командой foreachRDD) один раз, чтобы создать потребительский DStream Kafka, но в следующем микропакете он только запускает «foreachRDD». "заявление.

В качестве примера предположим, что r.nextInt (250) вернул 40. Задание Spark Streaming подключится к topic_40 и обработает его данные. Но в следующих микропакетах он все равно будет подключаться к topic_40 и будет игнорировать все команды перед оператором foreachRDD.

Полагаю, это ожидаемо, поскольку код перед оператором foreachRDD выполняется только на драйвере Spark.

У меня вопрос, есть ли способ, которым я могу сделать это без необходимости перезапуска приложения Spark каждые 5 секунд?

Спасибо.

1 Ответ

0 голосов
/ 02 мая 2018

Мой подход был бы очень простым, если вы хотите, чтобы он был действительно случайным и не заботился о каких-либо других последствиях, сделайте kafka_list_one_topic в качестве изменяемой переменной и измените ее в коде потоковой передачи.

val r = scala.util.Random
var kafka_list_one_topic=List("topic_"+ r.nextInt(250))

val consumer:DStream[ConsumerRecord[String,Array[Byte]]] = 
KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferBrokers, 
ConsumerStrategies.
Subscribe[String, Array[Byte]](kafka_list_one_topic , kafka_parameters))

consumer.foreachRDD( eachRDD => {
 // DOING SOMETHING WITH THE DATA...
 kafka_list_one_topic=List("topic_"+ r.nextInt(250))
 })
ssc.start()
ssc.awaitTermination()
...