Как конвертировать сообщения Kafka AVRO в потоковый фрейм данных Spark в версии 2.0.2, как конвертировать потоки Kafka Direct в Dataframe - PullRequest
0 голосов
/ 23 мая 2019

Невозможно сохранить сообщения Avro в потоковый фрейм данных spark / scala в версии spark 2.0.2.Как преобразовать потоки Kafka Direct в Dataframe.

Я использую золотые ворота и слитного производителя Kafka для получения изменений в базе данных Oracle.Я получаю сообщения Кафки в формате Avro.В более новых версиях это очень просто, но в версии Spark 2.0.2 возникают трудности.

import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.parsing.json.JSON

object consumerexample {


  def main(args: Array[String]): Unit = {



    //Define function to convert from GenericRecord to Row
    def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
      val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
      import scala.collection.JavaConversions._
      for (field <- record.getSchema.getFields) {
        objectArray(field.pos) = record.get(field.pos)
      }

      new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
    }

    val spark = SparkSession
      .builder()
      .appName("EmailMonitoring")
      .master("local[4]")
           .getOrCreate();


    val ssc = new StreamingContext(spark.sparkContext,Seconds(5))

    val BOOTSTRAP_SERVERS = "ashaplq00005.stg-tfayd.com"
    val TOPIC = Array("testserver.testserver")
    val subjectValueName = TOPIC + "-value"
    val path = "./"
    val schemaRegistryURL  = "http://ashaplq00005.stg-tfayd.com:8081"

    val timer = new Thread() {
         override def run() {
          Thread.sleep(1000 * 30)
           spark.stop()
         }
        }


//    val  kafkaParams = new Properties()
//    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "testserver")
//    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
//    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  "io.confluent.kafka.serializers.KafkaAvroDeserializer")
//    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "VenkatConsumer")
//    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
//    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false")
//    kafkaParams.put("schema.registry.url", schemaRegistryURL)
//    val consumer = new KafkaConsumer[String, String](kafkaParams);


    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "testserver",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
      "group.id" -> "venkatdirectstream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> ("false"),
      "schema.registry.url"->schemaRegistryURL
    )
    //consumer.subscribe(Collections.singletonList(TOPIC))

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](TOPIC, kafkaParams)
    )




    stream.foreachRDD { rdd =>
      val json: DataFrame = SparkSession.builder().getOrCreate().read.json(rdd.map(x => x.value()))
      //json.rdd.saveAsTextFile("data/" + UID + "/" + i.incrementAndGet())
      println( ": Found: " + rdd.count() + " lines")
    }


    ssc.start()
    ssc.awaitTermination()




}
}

Я хочу сохранить сообщения в датафрейме.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...