Весенний загрузчик десериализации класса Kafka - не в доверенном пакете - PullRequest
0 голосов
/ 02 апреля 2019

Я знаю, что эта проблема очень распространена, но, следуя различным решениям, я не смог найти работающего. Я хочу десериализовать строки, а также свой собственный объект класса при получении сообщения в Kafka. С String все хорошо, но не с моим классом. Я добавил доверенные пакеты в конфигурации потребителя (com.springmiddleware.entities - это пакет, в котором находится мой класс):

@Bean
    public Map<String, Object> consumerConfigs() {


        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");

        return props;
    }

У меня есть это в моем application.yml файле:

spring:
kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: 'com.springmiddleware.entities'

И добавил эти строки в application.properties

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer 
spring.kafka.producer.properties.spring.json.add.type.headers=false

Но появляется следующая ошибка:

org.apache.kafka.common.errors.SerializationException: Ошибка десериализация ключ / значение для раздела topic2-0 со смещением 1. Если необходимо, пожалуйста, пройдите мимо записи, чтобы продолжить потребление. Вызванный: java.lang.IllegalArgumentException: класс 'com.springmiddleware.entities.Crime' отсутствует в доверенных пакетах: [java.util, java.lang]. Если вы считаете, что этот класс безопасен для десериализовать, пожалуйста, укажите его имя. Если сериализация только сделано из надежного источника, вы также можете включить доверять всем (*).

UPDATE

ReceiverConfig:

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {


        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
                new JsonDeserializer<>());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

ОБНОВЛЕНИЕ 2

Listener Class (Receiver): 

    @KafkaListener(topics = "${app.topic.foo}")
@Service
public class Receiver {


     private CountDownLatch latch = new CountDownLatch(1);

        public CountDownLatch getLatch() {
            return latch;
        }

   @KafkaHandler
    public void listen(@Payload Crime message) {

            System.out.println("Received " + message);
    }

   @KafkaHandler
    public void listen(@Payload String message) {

        System.out.println("Received " +  message);
}

1 Ответ

1 голос
/ 02 апреля 2019

Просто используйте перегруженный JsonDeserializer конструктор

Начиная с версии 2.2, вы можете явно настроить десериализатор на использование предоставленного целевого типа и игнорировать информацию о типе в заголовках, используя один из перегруженных конструкторовкоторые имеют логическое значение useHeadersIfPresent (которое по умолчанию имеет значение true).

В следующем примере показано, как это сделать:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

Ваш код:

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
            new JsonDeserializer<>(Object.class,false));
}

А теперь используйте @KafkaListener на уровне класса

@KafkaListener(topics = "myTopic")
@Service
public class MultiListenerBean {

@KafkaHandler
public void listen(Cat cat) {
    ...
}

@KafkaHandler
public void listen(Hat hat) {
    ...
}

@KafkaHandler(isDefault = true)
public void delete(Object obj) {
    ...
   }

}
...