Я использую потоковый код spark в рабочей среде и не могу видеть вкладку потоковой передачи, а также не вижу никаких заданий на вкладке заданий пользовательского интерфейса Spark.
Тот же код, который я запустил на локальном компьютере, и он работал нормально.
Я сравнил все параметры, которые я передаю для kafka, такие как имя темы, загрузочные серверы и т. Д., И все выглядит хорошо.
Я не могу понять, что происходит с потоковым вещанием и чего мне не хватает.
Я пытался работать в разных режимах (кластер / клиент), чтобы увидеть проблему, но информация не найдена, и она ведет себя одинаково.
var canCommit = ArrayCanCommitOffsets
var offsetRanges = ArrayArray [OffsetRange]
val kafkaStream = {
val streams = (1 to 3) map {
_ => createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))
}
streams.foreach(stream => {
canCommit :+ stream.asInstanceOf[CanCommitOffsets]
stream.foreachRDD(rdd => {
offsetRanges :+ rdd.asInstanceOf[HasOffsetRanges].offsetRanges
})
})
val unionStrm= ssc.union(streams)
unionStrm.map(_.value())
}
kafkaStream.foreachRDD(rdd => {
***doing some process***
EsSpark.saveJsonToEs(rdd,index)
canCommit.foreach(stream => {
offsetRanges.foreach(ranges =>{
logInfo("Inside ranges")
stream.commitAsync(ranges)
})
})
})
Я передаю следующее как кафкапарм:
kafkaParams = Map(
kafka.startingOffsets -> latest,
kafka.value.deserializer -> org.apache.kafka.common.serialization.StringDeserializer,
zookeeper.connection.timeout.ms -> 30000,
kafka.key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer,
security.protocol -> xxxx_yyyyy,
group.id -> test-consumer-group,
bootstrap.servers -> xxxx:1234,yyyy:1234,
zookeeper.connect -> xxxx:2181,zzzz:2181
)
Может ли кто-нибудь указать причину, по которой не следует запускать какие-либо работы в сфере распространения искры?