Я пытаюсь использовать запись, используя spring-kata, но получаю следующее исключение при запуске сервера. Производитель использует CDC через автономный конфлюент с MySQL. Я могу правильно потреблять его у автономного потребителя, но мне кажется, что потребление с помощью spring-kafka является проблемой для меня.
Caused by: java.lang.NoClassDefFoundError: io/confluent/common/config/ConfigException
at java.lang.Class.forName0(Native Method) ~[na:1.8.0_131]
at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_131]
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:709) ~[kafka-clients-2.1.1.jar:na]
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471) ~[kafka-clients-2.1.1.jar:na]
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) ~[kafka-clients-2.1.1.jar:na]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62) ~[kafka-clients-2.1.1.jar:na]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75) ~[kafka-clients-2.1.1.jar:na]
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:511) ~[kafka-clients-2.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617) ~[kafka-clients-2.1.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:139) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:134) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:102) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:425) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:259) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:269) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:164) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:269) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:289) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:238) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.0.8.RELEASE.jar:5.0.8.RELEASE]
... 12 common frames omitted
Caused by: java.lang.ClassNotFoundException: io.confluent.common.config.ConfigException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_131]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_131]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) ~[na:1.8.0_131]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_131]
... 32 common frames omitted
Зависимость
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
Конфиг с набором свойств
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<GenericRecord, GenericRecord>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//setting number of concurrent threads which will read from kafka topic,
//basically it's a number of consumers in consumer group
//it has concurrency 5, because number of partition in Kafka topic is 5, 1 thread per partition.
factory.setConcurrency(5);
//setting number of ms when to emit "notification" if kafka consumer is idle,
//e.g. no more messages in kafka topic
factory.getContainerProperties().setIdleEventInterval(3000L);
return factory;
}
@Bean
public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro");
return props;
}
Потребитель, который пытается прочитать сообщение
@KafkaListener(topics = "mysql-foobar", groupId = "group_id")
public void consume(ConsumerRecord<String, Foobar> message) {
System.out.println("consumed message is value: "+message.value());
}