ClassNotFoundException с потребителем Кафки - PullRequest
0 голосов
/ 23 мая 2018

У меня есть потребительское приложение Kafka, написанное на Spring Boot 2.0.2.Когда я получаю сообщение в моем приемнике, я получаю следующую ошибку:

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.test.demo.domain.Account]; nested exception is java.lang.ClassNotFoundException: com.test.demo.domain.Account

Имя класса объекта в источнике: " com.test.demo.domain.Account "но у меня есть другой пакет и имя класса в потребителе.

Когда я переупаковываю имя класса потребителя, чтобы соответствовать производителям, все работает нормально.Однако я считаю, что не должен был этого делать.

Кто-нибудь знает проблему?

==== ОБНОВЛЕНО ====

Код моего производителя:

@Bean public ProducerFactory<String, Account> accountProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
    return new DefaultKafkaProducerFactory<>(configProps); }

@Bean public KafkaTemplate<String, Account> accountKafkaTemplate() {
    ProducerFactory<String, Account> factory = accountProducerFactory();

    return new KafkaTemplate<>(factory); }

Код потребителя:

public ConsumerFactory<String, Account> accountConsumerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
    return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Account> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(accountConsumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

Исключение:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.kconsumer.accountconsumer.service.AccountConsumer.accountListener(com.kconsumer.accountconsumer.domain.Account)]
Bean [com.kconsumer.accountconsumer.service.AccountConsumer@444cc791]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.kconsumer.accountconsumer.domain.Account] for GenericMessage [payload={"id":"5b079d0b340d9ef2ac9b6f02","name":"test-400","version":0}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6a0a6b0b, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=5b079d0b340d9ef2ac9b6f02, kafka_receivedPartitionId=0, kafka_receivedTopic=ktest, kafka_receivedTimestamp=1527225611820, __TypeId__=[B@2f92e17a}], failedMessage=GenericMessage [payload={"id":"5b079d0b340d9ef2ac9b6f02","name":"test-400","version":0}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6a0a6b0b, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=5b079d0b340d9ef2ac9b6f02, kafka_receivedPartitionId=0, kafka_receivedTopic=ktest, kafka_receivedTimestamp=1527225611820, __TypeId__=[B@2f92e17a}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:257)

Ответы [ 2 ]

0 голосов
/ 23 мая 2018

Если вы используете @KafkaListener, используйте StringDeserializer или ByteArrayDserializer и добавьте StringJsonMessageConverter @Bean в контекст приложения.

Затем ...

@KafkaListener(...)
public void listen(Account account) {
    ...
}

... требуемый тип учетной записи передается в конвертер.

См. документацию .

РЕДАКТИРОВАТЬ

Вам не нужна фабрика соединений, boot обнаружит конвертер и подключит его для вас.

Вот пример:

@SpringBootApplication
public class So50478267Application {

    public static void main(String[] args) {
        SpringApplication.run(So50478267Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Account1> template) {
        return args -> template.send("so50478267", new Account1("foo.inc"));
    }

    @KafkaListener(id = "listen", topics = "so50478267")
    public void listen(Account2 account) {
        System.out.println(account);
    }

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50478267", 1, (short) 1);
    }

    public static class Account1 {

        private final String customer;

        public Account1(String customer) {
            this.customer = customer;
        }

        public String getCustomer() {
            return this.customer;
        }

    }

    public static class Account2 {

        private String customer;

        public Account2() {
            super();
        }

        public String getCustomer() {
            return this.customer;
        }

        public void setCustomer(String customer) {
            this.customer = customer;
        }

        @Override
        public String toString() {
            return "Account2 [customer=" + this.customer + "]";
        }

    }

}

и

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
0 голосов
/ 23 мая 2018

Как вы десериализовали свой объект записи Kafka?Если вы используете десериализаторы из API Spring-kafka, такие как StringDeserializer, JsonDeserializer ... Я думаю, что невозможно изменить имя пакета объекта записи.

Если вы пишете собственные Serdes для записи Kafka самостоятельно, то вы можете сделать это в десериализациилогика путем переопределения метода readClassDescriptor () из ObjectInputStream.Пожалуйста, найдите образец кода здесь .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...