Я почти уверен, что не существует простого способа сделать это, но вот мой вариант использования:
У меня есть задание Spark Streaming (версия 2.1.0) с продолжительностью 5 секунд для каждого микропакета.
Моя цель состоит в том, чтобы потреблять данные из одной темы в каждом интервале микробакта из 250 тем Кафки. Вы можете взять приведенный ниже код в качестве простого примера:
val groupId:String = "first_group"
val kafka_servers:String = "datanode1:9092,datanode2:9092,datanode3:9092"
val ss:SparkSession = SparkSession.builder().config("spark.streaming.unpersist","true").appName("ConsumerStream_test").getOrCreate()
val ssc:StreamingContext= new StreamingContext(ss.sparkContext,Duration(5000))
val kafka_parameters:Map[String,Object]=Map(
"bootstrap.servers" -> kafka_servers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[ByteArrayDeserializer],
"heartbeat.interval.ms" -> (1000:Integer),
"max.poll.interval.ms" -> (100:Integer),
"enable.auto.commit" -> (false: java.lang.Boolean),
"autoOffsetReset" -> OffsetResetStrategy.EARLIEST,
//"connections.max.idle.ms" -> (5000:Integer),
"group.id" -> groupId
)
val r = scala.util.Random
val kafka_list_one_topic=List("topic_"+ r.nextInt(250))
val consumer:DStream[ConsumerRecord[String,Array[Byte]]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferBrokers, ConsumerStrategies.
Subscribe[String, Array[Byte]](kafka_list_one_topic , kafka_parameters))
consumer.foreachRDD( eachRDD => {
// DOING SOMETHING WITH THE DATA...
})
ssc.start()
ssc.awaitTermination()
Но проблема этого подхода заключается в том, что Spark будет запускать исходный код (все перед командой foreachRDD) один раз, чтобы создать потребительский DStream Kafka, но в следующем микропакете он только запускает «foreachRDD». "заявление.
В качестве примера предположим, что r.nextInt (250) вернул 40. Задание Spark Streaming подключится к topic_40 и обработает его данные. Но в следующих микропакетах он все равно будет подключаться к topic_40 и будет игнорировать все команды перед оператором foreachRDD.
Полагаю, это ожидаемо, поскольку код перед оператором foreachRDD выполняется только на драйвере Spark.
У меня вопрос, есть ли способ, которым я могу сделать это без необходимости перезапуска приложения Spark каждые 5 секунд?
Спасибо.