Мое приложение для потоковой передачи с использованием искры берет только один раз каждую запись, когда я использую его на локальном компьютере, но при развертывании его на отдельном кластере оно дважды читает одно и то же сообщение от Kafka.Кроме того, я дважды проверил, что это не проблема, связанная с производителем kafka.
Вот как я создаю поток:
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String]("aTopic", kafkaParams))
Это конфигурация kafkaParams:
"bootstrap.servers" -> KAFKA_SERVER,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
Кластер имеет 2 подчиненных с одним рабочим на подчиненных, похоже, что каждый рабочий принимает одно и то же сообщение.Кто-нибудь может мне помочь, пожалуйста?
РЕДАКТИРОВАТЬ
Например, когда я отправляю одно очко от Кафки.Из этого кода:
stream.foreachRDD((rdd, time) => {
if (!rdd.isEmpty) {
log.info("Taken "+rdd.count()+ " points")
}
}
Я получаю "Taken 2 points"
.Если я напечатаю их, они равны.Я что-то не так делаю?
Я использую
- "org.apache.spark" %% "spark-streaming-kafka-0-10"% "2.2.0"
- искра 2.2.0
- kafka_2.11-0.11.0.1