Spark kafka avro производитель в структурированном потоке - PullRequest
0 голосов
/ 24 апреля 2018

У меня есть работающая UDF, которая создает побочный эффект, отправляет сообщение kafka в avro, которое, как я знаю, не является целью UDF. Я не мог найти хороший способ сделать это, и это работает ... но мне интересно, действительно ли это плохая идея. У кого-нибудь есть лучший способ сделать это?

#if you don't have a schema reg
var testSchema = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"
    val df = Seq(
      ("1")
    ).toDF("f1")

    val topic = "mytest"
    val brokers = "kafka01:9092"
    val schemaRegistryURL = "http://sr:8081"
    val subjectValueName = topic + "-value"

    val KafkaAvroProducerFunct: (String => String) = (value: String) => {
      val props = new Properties()
      props.put("bootstrap.servers", brokers)
      props.put("key.serializer", classOf[KafkaAvroSerializer].getCanonicalName)
      props.put("value.serializer", classOf[KafkaAvroSerializer].getCanonicalName)
      props.put("schema.registry.url", schemaRegistryURL)
      val producer = new KafkaProducer[GenericRecord, GenericRecord](props)
      val vProps = new kafka.utils.VerifiableProperties(props)
      val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueName)
      val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
      val avroRecord = new GenericData.Record(messageSchema)
      avroRecord.put("f1", value)
      //val record = new ProducerRecord(topic, "key", avroRecord)
      val record = new ProducerRecord[GenericRecord, GenericRecord](topic, avroRecord)
      producer.send(record)
      "sent"
    }
...