Я не хочу использовать @KafkaListener или @StreamListener, но я хочу вручную опросить kafka. Я использую библиотеку spring-cloud-starter-stream-kafka и у меня есть следующий Kafka Producer
@Autowired
private KafkaTemplate<byte[], byte[]> template;
public void sendMessages() {
IntStream.range(2)
.forEach(val -> {
template.send("kafka-topic", "hello".getBytes());
});
}
Я бы хотел вручную опросить ту же самую kafka topi c, используя spring-kafka. Я попробовал следующий потребитель
@Autowired
private ConsumerFactory consumerFactory;
public void processKafkaRecords() throws InterruptedException {
Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
consumer.subscribe(Arrays.asList("kafka-topic"));
ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
poll.forEach(record -> {
log.info("record {}", record);
});
}
application.properties
spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true
spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false
Однако потребитель никогда не получает никаких записей, отправленных производителем. Есть идеи как вручную опросить кафку топи c?