Мне нужно загрузить схему AVRO во время выполнения, и мне нужно передать серверы bootstrap и kafka topi c, чтобы определить правильную схему, но я не могу найти способ передачи этих параметров на десериализаторе (кроме жесткого кодирования) их). У вас есть идеи, как это сделать?
val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
ops.setKafkaTopic(pars.kafkaTopic)
ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
val p = Pipeline.create(ops)
p.apply( KafkaIO.read<String, Measurement>()
.withTopic(pars.kafkaTopic)
.withBootstrapServers(pars.kafkaBootstrapServers)
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializer(RemoteAvroDeserializer::class.java)
.withoutMetadata()
)
.apply(Values.create())
(TransformToMeasurementFN()))
.apply(
Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
.apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
.apply(Count.perElement())
.apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))
p.run()
Это мой десериализатор:
class RemoteAvroDeserializer : Deserializer<Measurement> {
val decoder: BinaryMessageDecoder<Measurement>
public constructor() {
val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
decoder = Measurement.createDecoder(schemaStore)
}
override fun deserialize(s: String, bytes: ByteArray): Measurement {
return decoder.decode(bytes)
}
override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
}
override fun close() {
}
}