Я знаю, что эта проблема очень распространена, но, следуя различным решениям, я не смог найти работающего. Я хочу десериализовать строки, а также свой собственный объект класса при получении сообщения в Kafka. С String все хорошо, но не с моим классом. Я добавил доверенные пакеты в конфигурации потребителя (com.springmiddleware.entities
- это пакет, в котором находится мой класс):
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
return props;
}
У меня есть это в моем application.yml
файле:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: 'com.springmiddleware.entities'
И добавил эти строки в application.properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
Но появляется следующая ошибка:
org.apache.kafka.common.errors.SerializationException: Ошибка
десериализация ключ / значение для раздела topic2-0 со смещением 1. Если необходимо,
пожалуйста, пройдите мимо записи, чтобы продолжить потребление. Вызванный:
java.lang.IllegalArgumentException: класс
'com.springmiddleware.entities.Crime' отсутствует в доверенных пакетах:
[java.util, java.lang]. Если вы считаете, что этот класс безопасен для
десериализовать, пожалуйста, укажите его имя. Если сериализация только
сделано из надежного источника, вы также можете включить доверять всем (*).
UPDATE
ReceiverConfig:
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
return props;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
ОБНОВЛЕНИЕ 2
Listener Class (Receiver):
@KafkaListener(topics = "${app.topic.foo}")
@Service
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaHandler
public void listen(@Payload Crime message) {
System.out.println("Received " + message);
}
@KafkaHandler
public void listen(@Payload String message) {
System.out.println("Received " + message);
}