Как передать параметры в десериализатор avro в apache beam (KafkaIO)? - PullRequest
0 голосов
/ 09 января 2020

Мне нужно загрузить схему AVRO во время выполнения, и мне нужно передать серверы bootstrap и kafka topi c, чтобы определить правильную схему, но я не могу найти способ передачи этих параметров на десериализаторе (кроме жесткого кодирования) их). У вас есть идеи, как это сделать?

    val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
    ops.setKafkaTopic(pars.kafkaTopic)
    ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
    ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
    val p = Pipeline.create(ops)

    p.apply( KafkaIO.read<String, Measurement>()
            .withTopic(pars.kafkaTopic)
            .withBootstrapServers(pars.kafkaBootstrapServers)
            .withKeyDeserializer(StringDeserializer::class.java)
            .withValueDeserializer(RemoteAvroDeserializer::class.java)
            .withoutMetadata()
    )
            .apply(Values.create())
            (TransformToMeasurementFN()))
            .apply(
                    Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
            .apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
            .apply(Count.perElement())
            .apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))

    p.run()

Это мой десериализатор:

class RemoteAvroDeserializer : Deserializer<Measurement> {
 val decoder: BinaryMessageDecoder<Measurement>

 public constructor() {
     val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
     decoder = Measurement.createDecoder(schemaStore)
 }

 override fun deserialize(s: String, bytes: ByteArray): Measurement {
     return decoder.decode(bytes)
 }

 override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
 }

 override fun close() {
 }
}

1 Ответ

0 голосов
/ 09 января 2020

Согласно документации Beam, вы можете установить конфигурацию потребителя следующим образом:

  KafkaIO... 
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))

Я полагаю, вы можете просто добавить schema.registry.url или что-то еще, здесь

...