Я новичок в потоковой передаче Spark. И вот мое требование: у меня есть json следующего формата:
{"brand":"blowfish","category":"shoes","description":"Blowfish Women's Hiro Whiskey Dcpu Ankle-high Synthetic Sandal - 8m","mode":"visa signature","orditems":"3","productcode":"404e242a-070f-4fbb-a271-489c92049b05","unitprice":"7630"}
Теперь в моем проекте требуется использовать Play json и подход RDD i необходимо сохранить его в таблице кассандры .Table: transction_table. keyspace: krios. Я могу добиться этого с помощью фрейма данных. Но в соответствии с дизайном мы должны следовать подходу RDD и классу case.
Пожалуйста, черт возьми. После преобразования json в класс Case я не получаются методы "saveToCassandra".
Вот мой пример кода:
import TransctionCoreClass.TransctionData
import com.datastax.spark.connector.cql.CassandraConnector
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import play.api.libs.json._
import play.api.libs.functional.syntax._
import com.datastax.spark.connector.streaming._
import play.api.libs.functional.syntax._
import play.api.libs.json.{JsPath, Json, Reads, Writes}
object TransctionCoreRDDFormat {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("org.apache.spark.streaming.kafka010").setLevel(Level.OFF)
case class TransctionData(productcode:String,description:String,brand:String,category:String,unitprice:String,orditems:String,mode:String)
val transactionSchema = StructType(Array(
StructField("brand", StringType, true),
StructField("category", StringType, true),
StructField("description", StringType, true),
StructField("mode", StringType, true),
StructField("ordItems", DoubleType, true),
StructField("productCode", StringType, true),
StructField("unitPrice", StringType, true)))
val spark = SparkSession.builder
.master("local")
.appName("TransctionReceiver")
.getOrCreate();
val ssc = new StreamingContext(spark.sparkContext, Seconds(30))
import spark.implicits._
ssc.sparkContext.setLogLevel("ERROR")
val topics = List("-first_topic")
val configuration = Configuration.apply(ConfigFactory.load.resolve)
val productStreanm=
KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,buildKafkaProperties(configuration)))
productStreanm.foreachRDD{
rddRaw=>{
println("Entrringgg")
val transrdd = rddRaw.map(x=>convertToCaseClass(x.value().toString))
===============Now how to store it in cassandra?????============ as I am not getting option to saveToCassnadra method available as part of transrdd.
}
}
implicit val transctionReads:Reads[TransctionData]={
(
( JsPath \ "brand").read[String] and
(JsPath \ "category").read[String] and
(JsPath \ "description").read[String] and
(JsPath \ "mode").read[String] and
(JsPath \ "ordItems").read[String] and
(JsPath \ "productCode").read[String] and
(JsPath \ "unitPrice").read[String]
)(TransctionData.apply _)
}
implicit val transctionFormat:Writes[TransctionData]=new Writes[TransctionData]{
def writes(trns: TransctionData)=Json.obj(
"brand" -> JsString(trns.brand),
"category" -> JsString(trns.category),
"description" -> JsString(trns.description),
"mode" -> JsString(trns.mode),
"ordItems" -> JsString(trns.orditems),
"productCode" -> JsString(trns.productcode),
"unitPrice" -> JsString(trns.unitprice)
)
}
val cassnadraConnector=CassandraConnector(
ssc.sparkContext.getConf
.set("spark.cassandra.connection.host","localhost")
.set("spark.cassandra.connection.port","9042")
.set("spark.cassandra.auth.username","cassandra")
.set("spark.cassandra.auth.password","cassandra")
)
def buildKafkaProperties(configuration: Configuration)=Map (
"bootstrap.servers" -> configuration.kafka.brokerList,
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" ->"earliest",
"offset" ->"20",
//"max.poll.records"->20,
// "enable.auto.commit" ->false,
"group.id" ->"grou1pu774u77i3u8"
)
def convertToCaseClass(data:String):Option[TransctionData]= {
val parsedJsValue = Json.parse(data)
val parsed = Json.fromJson[TransctionData](parsedJsValue)
parsed
parsed match{
case s @ JsSuccess(_,_) => s.asOpt
case e :JsError =>None
}
}
ssc.start()
// println(s"==========Spark context Sarted ]======${ssc.sparkContext.appName}")
ssc.awaitTermination()
}