IncompatibleClassChangeError при запуске Consumer - PullRequest
0 голосов
/ 20 февраля 2020

Я пытаюсь использовать реактор-кафку в моей банке. Ниже pom. xml.

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>reactive-kakfa</artifactId>
    <version>0.0.30-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.projectreactor.kafka/reactor-kafka -->
        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
            <version>1.2.2.RELEASE</version>
        </dependency>
    </dependencies>
</project>

Ниже приведен код пользователя kafka, написанный

    public Mono SampleConsumer(String bootstrapServers) {
        System.out.printf("Received message in bean");
        Map<String, Object> consumerProps = new HashMap();
        consumerProps.put("bootstrap.servers", bootstrapServers);
        consumerProps.put("group.id", "sample-group");
        consumerProps.put("key.deserializer", IntegerDeserializer.class);
        consumerProps.put("value.deserializer", StringDeserializer.class);
        ReceiverOptions<Object, Object> receiverOptions = ReceiverOptions.<Object, Object>create(consumerProps)
                .subscription(Collections.singleton(TOPIC));
        Flux<ReceiverRecord<Object, Object>> inboundFlux =
                KafkaReceiver.create(receiverOptions)
                        .receive();
        inboundFlux.subscribe(r -> {
            System.out.printf("Received message: %s\n", r);
            r.receiverOffset().acknowledge();
        });

        return Mono.just("some msg");
    }

Я сталкиваюсь с этой проблемой при создании объекта ReceiverOptions

java.lang.IncompatibleClassChangeError: Found class reactor.kafka.receiver.ReceiverOptions, but interface was expected

Я использую kafka-2.3.1 и пробовал чистую сборку, также сделал недействительным кеш и перезапустил intellij. Любая помощь с этим будет принята с благодарностью.

...