Я пытаюсь использовать реактор-кафку в моей банке. Ниже pom. xml.
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>reactive-kakfa</artifactId>
<version>0.0.30-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor.kafka/reactor-kafka -->
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
</dependencies>
</project>
Ниже приведен код пользователя kafka, написанный
public Mono SampleConsumer(String bootstrapServers) {
System.out.printf("Received message in bean");
Map<String, Object> consumerProps = new HashMap();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "sample-group");
consumerProps.put("key.deserializer", IntegerDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
ReceiverOptions<Object, Object> receiverOptions = ReceiverOptions.<Object, Object>create(consumerProps)
.subscription(Collections.singleton(TOPIC));
Flux<ReceiverRecord<Object, Object>> inboundFlux =
KafkaReceiver.create(receiverOptions)
.receive();
inboundFlux.subscribe(r -> {
System.out.printf("Received message: %s\n", r);
r.receiverOffset().acknowledge();
});
return Mono.just("some msg");
}
Я сталкиваюсь с этой проблемой при создании объекта ReceiverOptions
java.lang.IncompatibleClassChangeError: Found class reactor.kafka.receiver.ReceiverOptions, but interface was expected
Я использую kafka-2.3.1 и пробовал чистую сборку, также сделал недействительным кеш и перезапустил intellij. Любая помощь с этим будет принята с благодарностью.