Как добавить JsonDeserializer к потребителю Kafka в Котлине? - PullRequest
0 голосов
/ 08 июля 2019

Как я могу добавить JsonDeserializer к моему потребителю Kafka. Я получаю строку Json и хочу преобразовать ее в объект класса данных.

Это мой потребитель:

ConsumerConfig:

@EnableKafka
@Configuration
class KafkaConsumerConfig {
@Value("\${kafka.host:localhost}")
private val host: String? = null

@Value("\${kafka.port:9092}")
private val port: Int = 0

@Bean
fun userConsumerConfigs(): ConsumerFactory<String, String> {
    val props = HashMap<String, Any>()
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "$host:$port"
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[ConsumerConfig.GROUP_ID_CONFIG] = "helloworld"
    val mapper = ObjectMapper()
    return DefaultKafkaConsumerFactory(props, StringDeserializer(), StringDeserializer())

    //return props
}


@Bean
fun kafkaListenerContainerFactory(): 
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, 
String>> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
    factory.consumerFactory = userConsumerConfigs()
    return factory
}

@Bean
fun kafkaConsumer(): Consumerz {
    return Consumerz()
}

companion object {
    const val TOPIC = "test"
}
}

Потребитель:

class Consumerz {
@KafkaListener(topics = ["usertest"])
fun receive(message: String) {
     LOGGER.info("Received payload= $message")

}

companion object {
    private val LOGGER = LoggerFactory.getLogger(Consumerz::class.java)
}
}

Я сейчас пользуюсь только StringDeserializer. Как я могу реализовать JsonDeserializer в этом случае.

Токовый выход: Received payload= { "firstName": "Jack", "lastName" : "Adam" }

Заранее спасибо.

@ это проект Spring .

1 Ответ

0 голосов
/ 08 июля 2019

Вы можете напрямую использовать org.springframework.kafka.support.serializer.JsonDeserializer.class, как показано ниже

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Итак, потребительская конфигурация будет:

private ConsumerFactory<String, MyDomainModel> myMessageFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "KAPP");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyDomainModel.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyDomainModel> myMessageListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MyDomainModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(myMessageFactory());
    return factory;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...