Кафка-сообщение в реальном времени для ElasticSearch с использованием Apache Spark Streaming - PullRequest
1 голос
/ 21 апреля 2020

Доброе утро всем,

Я новичок в scala и потоковом искре, мой пример использования состоит в загрузке потока из Кафки в потоковую искру, а затем вasticsearch, вот мой код:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

object KAFKAStreaming {

def main(args: Array[String]): Unit = {
val brokers ="localhost:9092"
val groupid ="GRP1"
val topics ="producer"

val conf = new SparkConf().setMaster("local[*]").setAppName("test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes","http://localhost:9200")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

sc.setLogLevel("OFF")

val topicSet= topics.split(",").toSet
val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupid,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
)

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topicSet, kafkaParams)
)
// the stream :
//na1;03/04/2020 10:35;23
//na2;04/04/2020 10:35;15
//na1;05/04/2020 10:35;20
//na2;06/04/2020 10:35;12
//na1;07/04/2020 10:35;40

val line = messages.map(_.value)
val name =line.map(a=>("name" -> a.split(";")(0)))
val timestamp =line.map(a=>("timestamp" -> a.split(";")(1)))
val value =line.map(a=>("value" -> a.split(";")(2).toInt))

 sc.makeRDD(Seq(name,timestamp,value)).saveToEs("spark/docs")

ssc.start()
ssc.awaitTermination()

}
}

Я получаю эту ошибку: Исключение в потоке "главная" организация. apache .spark.SparkException: задание прервано из-за сбоя этапа: не удалось сериализовать задачу 2, не пытаясь повторить попытку. Исключение во время сериализации: java .io.NotSerializableException: Граф неожиданно обнуляется, когда DStream сериализуется. Стек сериализации:

Я понял, что это проблема сериализации, но я не знал, как ее решить.

полная трассировка ошибок:

Исключение в потоке "main" org. apache .spark.SparkException: задание прервано из-за сбоя этапа: не удалось сериализовать задачу 2, не пытаясь повторить попытку. Исключение во время сериализации: java .io.NotSerializableException: Граф неожиданно обнуляется, когда DStream сериализуется. Стек сериализации:

at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:274)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:108)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:79)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:74)
at org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:55)
at KAFKAStreaming$.main(KAFKAStreaming.scala:50)
at KAFKAStreaming.main(KAFKAStreaming.scala)

Ответы [ 2 ]

0 голосов
/ 21 апреля 2020

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

Вы можете сохранить jsons вasticsearch, как показано ниже.

val yourRdd = Seq(("name","timestamp","value"),("name1","timestamp1","value1")).toDF().rdd
EsSpark.saveJsonToEs(yourRdd, elasticconfig)
0 голосов
/ 21 апреля 2020

В вашем коде есть некоторые заблуждения относительно того, как вы создаете rdd и как вы сохраняете их в elasti c search

sc.makeRDD(Seq(name,timestamp,value)).saveToEs("spark/docs")

Из ваших code name, timestamp and values уже rdd, поэтому создание Последовательность их и rdd не имеет смысла, следовательно, ошибка.

То, что вы хотите сделать, это:

name.saveToEs("spark/names")
timestamps.saveToEs("spark/timestamps")
value.saveToEs("spark/values")

Если вы хотите, чтобы все в одной коллекции делали класс case вокруг полученных сообщений на поток, расширить с большим количеством информации, а затем сохранить только один rdd в ES

...