Десериализовать сообщения kafka в KafkaConsumer с помощью springboot - PullRequest
1 голос
/ 30 марта 2020

У меня есть приложение springboot, которое прослушивает сообщения kafka и конвертирует их в объект

@KafkaListener(topics = "test", groupId = "group_id")
public void consume(String message) throws IOException {
        ObjectMapper objectMapper = new ObjectMapper();
        Hostel hostel = objectMapper.readValue(message, Hostel.class);
}

Я думаю, что если я могу сделать это напрямую

@KafkaListener(topics = "test", groupId = "group_id")
public void consume(Hostel hostel) throws IOException { 
}

1 Ответ

2 голосов
/ 30 марта 2020

Вы можете сделать это, используя spring-kafka. Но тогда вам нужно использовать пользовательский десериализатор (или JsonDeserializer) на фабрике контейнеров

@KafkaListener(topics = "test", groupId = "my.group", containerFactory = "myKafkaFactory")
fun genericMessageListener(myRequest: MyRequest, ack: Acknowledgment) {
//do Something with myRequest
ack.acknowledge()
}

Ваш Контейнерный завод будет выглядеть примерно так:

@Bean
fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> {
val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
return factory
}

Ваш десериализатор будет выглядеть как

public class MyRequestDeserializer implements Deserializer {
private static ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map map, boolean b) {
}

@Override
public MyRequest deserialize(String arg0, byte[] msgBytes) {
    try {
        return objectMapper.readValue(new String(msgBytes), MyRequest.class);
    } catch (IOException ex) {
        log.warn("JSON parse/ mapping exception occurred. ", ex);
        return new MyRequest();
    }
}

@Override
public void close() {
    log.debug("MyRequestDeserializer closed");
}
}

В качестве альтернативы, вы можете использовать значение по умолчанию JsonDeserializer, как указано в spring docs

...