Spring Boot Kafka потребитель с несколькими типами сбоев - PullRequest
0 голосов
/ 03 февраля 2020

Насколько я понимаю, если в одном и том же процессе я имею topic1 = ClassA, topic2 = ClassB, мне нужно 2 фабрики контейнеров?

Мой класс конфигурации:

@Bean
public ConsumerFactory<String, MessageADto> xxxConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(MessageADto.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageADto> xxxListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MessageADto> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(xxxConsumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, MessageBDto> xxx2ConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(MessageBDto.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageBDto> xxx2ListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MessageBDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(xxx2ConsumerFactory());
    return factory;
}

private Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaHost());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "xxx");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return props;
}

В моем класс контроллера покоя, у меня есть следующие прослушиватели (только для PO C):

@KafkaListener(topics=KafkaTopicConfig.xxx_TOPIC, containerFactory="xxxListenerContainerFactory")
public void xxxListener(MessageADto message) {
    System.out.println(message.getMessage());
}

@KafkaListener(topics=KafkaTopicConfig.xxx2_TOPIC, containerFactory="xxx2ListenerContainerFactory")
public void xxx2Listener(MessageBDto message) {
    System.out.println(message.getMessage() + " : " + message.getCount());
}

Для метода покоя контроллера я должен отправить MessageADto и MessageBDto:

        MessageBDto messageBDto = new MessageBDto() {{ setMessage(message.getMessage()); setCount(17); }};
        this.kafkaService.sendMessageB(messageBDto);
        return convertToDto(this.kafkaService.sendMessageA(message).get().getRecordMetadata());

Это приводит к исключению:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition dtms2-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.xxx.xxx.controllers.KafkaController$1' is not in the trusted packages: [java.util, java.lang, org.xxx.xxx.dtos]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:125) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:99) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:425) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1012) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:968) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Я не понимаю это исключение. Какое это имеет отношение к моему классу контроллеров?

1 Ответ

0 голосов
/ 03 февраля 2020

Вам нужно добавить свой класс контроллера в список доверенных пакетов

Попробуйте добавить это в свой файл свойств

spring.kafka.consumer.properties.spring.json.trusted.packages=*

Если он работает, вы можете определите, какие пакеты вам нужны, и замените * на имена пакетов, разделенные запятыми.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...