разделение RDD и группового входа в потоковой передаче дает ошибку несоответствия - PullRequest
0 голосов
/ 21 февраля 2019

, поэтому я борюсь с данными kafka json, используя потоковую передачу по искрам, и застрял в разбиении 15 различных схем таблиц, которые входят в один файл json, на 15 DF, чтобы я мог создавать схемы поверх каждой из них.ошибка несоответствия: 70: ошибка: несоответствие типов; найдено org.apache.spark.sql.ColumnName обязательно: kafkarec =>? .please help.

spark-shell --master yarn-client \
    --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf" \
    --jars kafkaspark/spark-streaming-kafka-0-10-assembly_2.11-2.3.0.jar

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.storage.StorageLevel
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.sql.{SQLContext,SaveMode}
    import org.apache.spark.sql.SQLContext.implicits._
    var kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "hpca04r01n02.hpc.com:9092,hpca04r02n02.hpc.com:9092,hpca04r03n02.hpc.com:9092,hpcb04r03n02.hpccom:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "dops",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
       kafkaParams = kafkaParams + ("security.protocol" -> "SASL_PLAINTEXT")
    val ssc = new StreamingContext(sc, Seconds(20))
        val topics = Array("dsc-10147-qlsvo-oap-lz")

    case class kafkarec(cskey:String,csvalue:String)
    import spark.implicits._


    val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        ).map{e => kafkarec(e.key(),e.value())}  

    stream.foreachRDD(rdd => {
         if (!rdd.isEmpty()) {
         rdd.toDF.coalesce(1).write.mode(SaveMode.Append).json("hdfs:///user/qlsvokafka/kafoapnew")
         }})
         ssc.start()
    +-------------------+--------------------+
    |              cskey|             csvalue|
    +-------------------+--------------------+
    |QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
    |QLSMGR+UNIT_LANG   |{"data": {"ID_PRO...
    |QLSMGR+UNIT_MASTER |{"data": {"SERIAL...|
    |QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
    |QLSMGR+UNIT_MASTER |{"data": {"SERIAL...|
    |QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
    |QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
    +-------------------+--------------------+

    Now I like to have thios RDD partitioned/split/reduce by each differnt cskey/schema something like this:
    +-------------------+--------------------+
    |              cskey|             csvalue|
    +-------------------+--------------------+
    |QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
    |QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
    |QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...
    +-------------------+--------------------+
    |              cskey|             csvalue|
    +-------------------+--------------------+
    |QLSMGR+UNIT_MASTER |{"data": {"SERIAL...|
    |QLSMGR+UNIT_MASTER |{"data": {"SERIAL...|

    +-------------------+--------------------+
    |              cskey|             csvalue|
    +-------------------+--------------------+

    so I can parse csvalue per schema .

I could use rdd.toDF.filter($"cskey".isin("QLSMGR+UNIT_CONCERN")).show() and it could work but I'm trying to think like groupby/reducedbyKey work or not but get error:
stream.foreachRDD(rdd => {
     if (!rdd.isEmpty()) {
    val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
    import sqlContext._
    import org.apache.spark.sql._
    import sqlContext.implicits._
     val batchDF = rdd.cache
     batchDF.groupBy($"cskey").toDF("qdf").show(20)}}) 

but I receive like mismatch error
<console>:70: error: type mismatch;
 found   : org.apache.spark.sql.ColumnName
 required: kafkarec => ?
...