Доброе утро всем,
Я новичок в scala и потоковом искре, мой пример использования состоит в загрузке потока из Кафки в потоковую искру, а затем вasticsearch, вот мой код:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
object KAFKAStreaming {
def main(args: Array[String]): Unit = {
val brokers ="localhost:9092"
val groupid ="GRP1"
val topics ="producer"
val conf = new SparkConf().setMaster("local[*]").setAppName("test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes","http://localhost:9200")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
sc.setLogLevel("OFF")
val topicSet= topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicSet, kafkaParams)
)
// the stream :
//na1;03/04/2020 10:35;23
//na2;04/04/2020 10:35;15
//na1;05/04/2020 10:35;20
//na2;06/04/2020 10:35;12
//na1;07/04/2020 10:35;40
val line = messages.map(_.value)
val name =line.map(a=>("name" -> a.split(";")(0)))
val timestamp =line.map(a=>("timestamp" -> a.split(";")(1)))
val value =line.map(a=>("value" -> a.split(";")(2).toInt))
sc.makeRDD(Seq(name,timestamp,value)).saveToEs("spark/docs")
ssc.start()
ssc.awaitTermination()
}
}
Я получаю эту ошибку: Исключение в потоке "главная" организация. apache .spark.SparkException: задание прервано из-за сбоя этапа: не удалось сериализовать задачу 2, не пытаясь повторить попытку. Исключение во время сериализации: java .io.NotSerializableException: Граф неожиданно обнуляется, когда DStream сериализуется. Стек сериализации:
Я понял, что это проблема сериализации, но я не знал, как ее решить.
полная трассировка ошибок:
Исключение в потоке "main" org. apache .spark.SparkException: задание прервано из-за сбоя этапа: не удалось сериализовать задачу 2, не пытаясь повторить попытку. Исключение во время сериализации: java .io.NotSerializableException: Граф неожиданно обнуляется, когда DStream сериализуется. Стек сериализации:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:274)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:108)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:79)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:74)
at org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:55)
at KAFKAStreaming$.main(KAFKAStreaming.scala:50)
at KAFKAStreaming.main(KAFKAStreaming.scala)