Насколько я понимаю, если в одном и том же процессе я имею topic1 = ClassA, topic2 = ClassB, мне нужно 2 фабрики контейнеров?
Мой класс конфигурации:
@Bean
public ConsumerFactory<String, MessageADto> xxxConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(MessageADto.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageADto> xxxListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageADto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(xxxConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, MessageBDto> xxx2ConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(MessageBDto.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageBDto> xxx2ListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageBDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(xxx2ConsumerFactory());
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaHost());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "xxx");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
В моем класс контроллера покоя, у меня есть следующие прослушиватели (только для PO C):
@KafkaListener(topics=KafkaTopicConfig.xxx_TOPIC, containerFactory="xxxListenerContainerFactory")
public void xxxListener(MessageADto message) {
System.out.println(message.getMessage());
}
@KafkaListener(topics=KafkaTopicConfig.xxx2_TOPIC, containerFactory="xxx2ListenerContainerFactory")
public void xxx2Listener(MessageBDto message) {
System.out.println(message.getMessage() + " : " + message.getCount());
}
Для метода покоя контроллера я должен отправить MessageADto и MessageBDto:
MessageBDto messageBDto = new MessageBDto() {{ setMessage(message.getMessage()); setCount(17); }};
this.kafkaService.sendMessageB(messageBDto);
return convertToDto(this.kafkaService.sendMessageA(message).get().getRecordMetadata());
Это приводит к исключению:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition dtms2-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.xxx.xxx.controllers.KafkaController$1' is not in the trusted packages: [java.util, java.lang, org.xxx.xxx.dtos]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:125) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:99) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:425) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1012) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:968) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Я не понимаю это исключение. Какое это имеет отношение к моему классу контроллеров?