Если вы хотите обработать набор сообщений, который определяется начальным и конечным смещением в потоковой передаче искры, вы можете использовать следующий код:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "groupId"
)
val offsetRanges = Array(
OffsetRange("input", 0, 2, 4) // <-- topic name, partition number, fromOffset, untilOffset
)
val sparkContext: SparkContext = ???
val rdd = KafkaUtils.createRDD(sparkContext, kafkaParams.asJava, offsetRanges, PreferConsistent)
// other proccessing and saving
Более подробную информацию о потоковой интеграции интеграции и Kafka можно найти:https://spark.apache.org/docs/2.4.0/streaming-kafka-0-10-integration.html