У меня есть работающая UDF, которая создает побочный эффект, отправляет сообщение kafka в avro, которое, как я знаю, не является целью UDF. Я не мог найти хороший способ сделать это, и это работает ... но мне интересно, действительно ли это плохая идея. У кого-нибудь есть лучший способ сделать это?
#if you don't have a schema reg
var testSchema = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"
val df = Seq(
("1")
).toDF("f1")
val topic = "mytest"
val brokers = "kafka01:9092"
val schemaRegistryURL = "http://sr:8081"
val subjectValueName = topic + "-value"
val KafkaAvroProducerFunct: (String => String) = (value: String) => {
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", classOf[KafkaAvroSerializer].getCanonicalName)
props.put("value.serializer", classOf[KafkaAvroSerializer].getCanonicalName)
props.put("schema.registry.url", schemaRegistryURL)
val producer = new KafkaProducer[GenericRecord, GenericRecord](props)
val vProps = new kafka.utils.VerifiableProperties(props)
val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueName)
val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
val avroRecord = new GenericData.Record(messageSchema)
avroRecord.put("f1", value)
//val record = new ProducerRecord(topic, "key", avroRecord)
val record = new ProducerRecord[GenericRecord, GenericRecord](topic, avroRecord)
producer.send(record)
"sent"
}