Преобразуйте сообщение KAFKA json в класс case и сохраните его в Cassandra, используя подход RDD, используя play json - PullRequest
0 голосов
/ 16 июня 2020

Я новичок в потоковой передаче Spark. И вот мое требование: у меня есть json следующего формата:

{"brand":"blowfish","category":"shoes","description":"Blowfish Women's Hiro Whiskey Dcpu Ankle-high Synthetic Sandal - 8m","mode":"visa signature","orditems":"3","productcode":"404e242a-070f-4fbb-a271-489c92049b05","unitprice":"7630"}

Теперь в моем проекте требуется использовать Play json и подход RDD i необходимо сохранить его в таблице кассандры .Table: transction_table. keyspace: krios. Я могу добиться этого с помощью фрейма данных. Но в соответствии с дизайном мы должны следовать подходу RDD и классу case.

Пожалуйста, черт возьми. После преобразования json в класс Case я не получаются методы "saveToCassandra".

Вот мой пример кода:

import TransctionCoreClass.TransctionData
import com.datastax.spark.connector.cql.CassandraConnector
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import play.api.libs.json._
import play.api.libs.functional.syntax._
import com.datastax.spark.connector.streaming._
import play.api.libs.functional.syntax._
import play.api.libs.json.{JsPath, Json, Reads, Writes}

object TransctionCoreRDDFormat {


  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("org.apache.spark.streaming.kafka010").setLevel(Level.OFF)
 case class TransctionData(productcode:String,description:String,brand:String,category:String,unitprice:String,orditems:String,mode:String)

  val transactionSchema = StructType(Array(
    StructField("brand", StringType, true),
    StructField("category", StringType, true),
    StructField("description", StringType, true),
    StructField("mode", StringType, true),
    StructField("ordItems", DoubleType, true),
    StructField("productCode", StringType, true),
    StructField("unitPrice", StringType, true)))

  val spark = SparkSession.builder
    .master("local")
    .appName("TransctionReceiver")
    .getOrCreate();
  val ssc = new StreamingContext(spark.sparkContext, Seconds(30))
  import spark.implicits._

  ssc.sparkContext.setLogLevel("ERROR")
  val topics = List("-first_topic")
  val configuration = Configuration.apply(ConfigFactory.load.resolve)
  val productStreanm=
    KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,buildKafkaProperties(configuration)))

  productStreanm.foreachRDD{
    rddRaw=>{
      println("Entrringgg")
      val transrdd = rddRaw.map(x=>convertToCaseClass(x.value().toString))
 ===============Now how to store it in cassandra?????============ as I am  not getting option to saveToCassnadra method available as part of transrdd.






    }


  }




  implicit val transctionReads:Reads[TransctionData]={
    (

      ( JsPath \ "brand").read[String] and
        (JsPath \ "category").read[String] and
        (JsPath \ "description").read[String] and
        (JsPath \ "mode").read[String] and
        (JsPath \ "ordItems").read[String] and
        (JsPath \ "productCode").read[String] and
        (JsPath \ "unitPrice").read[String]

      )(TransctionData.apply _)
  }

  implicit val transctionFormat:Writes[TransctionData]=new Writes[TransctionData]{
    def writes(trns: TransctionData)=Json.obj(
      "brand" -> JsString(trns.brand),
      "category" -> JsString(trns.category),
      "description" -> JsString(trns.description),
      "mode" -> JsString(trns.mode),
      "ordItems" -> JsString(trns.orditems),
      "productCode" -> JsString(trns.productcode),
      "unitPrice" -> JsString(trns.unitprice)

    )
  }

  val cassnadraConnector=CassandraConnector(
    ssc.sparkContext.getConf
      .set("spark.cassandra.connection.host","localhost")
      .set("spark.cassandra.connection.port","9042")
      .set("spark.cassandra.auth.username","cassandra")
      .set("spark.cassandra.auth.password","cassandra")
  )

  def buildKafkaProperties(configuration: Configuration)=Map (

    "bootstrap.servers" -> configuration.kafka.brokerList,
    "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
    "auto.offset.reset" ->"earliest",
    "offset" ->"20",
    //"max.poll.records"->20,
    //  "enable.auto.commit" ->false,
    "group.id" ->"grou1pu774u77i3u8"
  )


  def convertToCaseClass(data:String):Option[TransctionData]= {


    val parsedJsValue = Json.parse(data)
    val parsed = Json.fromJson[TransctionData](parsedJsValue)
    parsed
    parsed match{

      case s @ JsSuccess(_,_) => s.asOpt
      case e :JsError         =>None

    }

  }


  ssc.start()
  // println(s"==========Spark context Sarted ]======${ssc.sparkContext.appName}")
  ssc.awaitTermination()










}
...