spark .txt Кафка сообщение в Data Frame - PullRequest
0 голосов
/ 02 мая 2018

Я получаю ошибку CDRS.toDF() Ошибка

case class CDR(phone:String, first_type:String,in_out:String,local:String,duration:String,date:String,time:String,roaming:String,amount:String,in_network:String,is_promo:String,toll_free:String,bytes:String,last_type:String) 

// Create direct Kafka stream with brokers and topics
//val topicsSet = Set[String] (kafka_topic)
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> 
kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
ssc, kafkaParams, topicsSet).map(_._2)
//===============================================================================================

//Apply Schema Of Class CDR to Message Coming From Kafka
val CDRS = messages.map(_.split('|')).map(x=> CDR 

   (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12),x(13).repla ceAll("\n","")))
...