Ошибка реализации схемы реестра с помощью kafka - PullRequest
1 голос
/ 16 марта 2019

Я пытаюсь использовать запись, используя spring-kata, но получаю следующее исключение при запуске сервера. Производитель использует CDC через автономный конфлюент с MySQL. Я могу правильно потреблять его у автономного потребителя, но мне кажется, что потребление с помощью spring-kafka является проблемой для меня.

Caused by: java.lang.NoClassDefFoundError: io/confluent/common/config/ConfigException
    at java.lang.Class.forName0(Native Method) ~[na:1.8.0_131]
    at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_131]
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:709) ~[kafka-clients-2.1.1.jar:na]
    at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471) ~[kafka-clients-2.1.1.jar:na]
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) ~[kafka-clients-2.1.1.jar:na]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62) ~[kafka-clients-2.1.1.jar:na]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75) ~[kafka-clients-2.1.1.jar:na]
    at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:511) ~[kafka-clients-2.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617) ~[kafka-clients-2.1.1.jar:na]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:139) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:134) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:102) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:425) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:259) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:269) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:164) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:269) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:289) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:238) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    ... 12 common frames omitted
Caused by: java.lang.ClassNotFoundException: io.confluent.common.config.ConfigException
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_131]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_131]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) ~[na:1.8.0_131]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_131]
    ... 32 common frames omitted

Зависимость

<dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.1</version>
        </dependency>

Конфиг с набором свойств

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<GenericRecord, GenericRecord>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //setting number of concurrent threads which will read from kafka topic,
        //basically it's a number of consumers in consumer group
        //it has concurrency 5, because number of partition in Kafka topic is 5, 1 thread per partition.
        factory.setConcurrency(5);
        //setting number of ms when to emit "notification" if kafka consumer is idle,
        //e.g. no more messages in kafka topic
        factory.getContainerProperties().setIdleEventInterval(3000L);
        return factory;
    }

    @Bean
    public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        props.put("schema.registry.url", "http://localhost:8081");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro");

        return props;
    }

Потребитель, который пытается прочитать сообщение

@KafkaListener(topics = "mysql-foobar", groupId = "group_id")
    public void consume(ConsumerRecord<String, Foobar> message) {

        System.out.println("consumed message is value: "+message.value());
    }

1 Ответ

0 голосов
/ 16 марта 2019

Вы можете получить этот класс с этой зависимостью

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>common-config</artifactId>
    <version>5.1.0</version>
</dependency>
...