, поэтому я борюсь с данными kafka json, используя потоковую передачу по искрам, и застрял в разбиении 15 различных схем таблиц, которые входят в один файл json, на 15 DF, чтобы я мог создавать схемы поверх каждой из них.ошибка несоответствия: 70: ошибка: несоответствие типов; найдено org.apache.spark.sql.ColumnName обязательно: kafkarec =>? .please help.
spark-shell --master yarn-client \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf" \
--jars kafkaspark/spark-streaming-kafka-0-10-assembly_2.11-2.3.0.jar
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.sql.{SQLContext,SaveMode}
import org.apache.spark.sql.SQLContext.implicits._
var kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hpca04r01n02.hpc.com:9092,hpca04r02n02.hpc.com:9092,hpca04r03n02.hpc.com:9092,hpcb04r03n02.hpccom:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "dops",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
kafkaParams = kafkaParams + ("security.protocol" -> "SASL_PLAINTEXT")
val ssc = new StreamingContext(sc, Seconds(20))
val topics = Array("dsc-10147-qlsvo-oap-lz")
case class kafkarec(cskey:String,csvalue:String)
import spark.implicits._
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
).map{e => kafkarec(e.key(),e.value())}
stream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.toDF.coalesce(1).write.mode(SaveMode.Append).json("hdfs:///user/qlsvokafka/kafoapnew")
}})
ssc.start()
+-------------------+--------------------+
| cskey| csvalue|
+-------------------+--------------------+
|QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
|QLSMGR+UNIT_LANG |{"data": {"ID_PRO...
|QLSMGR+UNIT_MASTER |{"data": {"SERIAL...|
|QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
|QLSMGR+UNIT_MASTER |{"data": {"SERIAL...|
|QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
|QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
+-------------------+--------------------+
Now I like to have thios RDD partitioned/split/reduce by each differnt cskey/schema something like this:
+-------------------+--------------------+
| cskey| csvalue|
+-------------------+--------------------+
|QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
|QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...|
|QLSMGR+UNIT_CONCERN|{"data": {"QLS_UN...
+-------------------+--------------------+
| cskey| csvalue|
+-------------------+--------------------+
|QLSMGR+UNIT_MASTER |{"data": {"SERIAL...|
|QLSMGR+UNIT_MASTER |{"data": {"SERIAL...|
+-------------------+--------------------+
| cskey| csvalue|
+-------------------+--------------------+
so I can parse csvalue per schema .
I could use rdd.toDF.filter($"cskey".isin("QLSMGR+UNIT_CONCERN")).show() and it could work but I'm trying to think like groupby/reducedbyKey work or not but get error:
stream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext._
import org.apache.spark.sql._
import sqlContext.implicits._
val batchDF = rdd.cache
batchDF.groupBy($"cskey").toDF("qdf").show(20)}})
but I receive like mismatch error
<console>:70: error: type mismatch;
found : org.apache.spark.sql.ColumnName
required: kafkarec => ?