Spring Cloud Stream Ручной Поллер Кафка - PullRequest
0 голосов
/ 05 февраля 2020

Я не хочу использовать @KafkaListener или @StreamListener, но я хочу вручную опросить kafka. Я использую библиотеку spring-cloud-starter-stream-kafka и у меня есть следующий Kafka Producer

  @Autowired
  private KafkaTemplate<byte[], byte[]> template;

  public void sendMessages() {
    IntStream.range(2)
             .forEach(val -> {
               template.send("kafka-topic", "hello".getBytes());
             });
  }

Я бы хотел вручную опросить ту же самую kafka topi c, используя spring-kafka. Я попробовал следующий потребитель

 @Autowired
  private ConsumerFactory consumerFactory;

  public void processKafkaRecords() throws InterruptedException {
    Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
    consumer.subscribe(Arrays.asList("kafka-topic"));
    ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
    poll.forEach(record -> {
      log.info("record {}", record);
    });
  }

application.properties

spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true

spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false

Однако потребитель никогда не получает никаких записей, отправленных производителем. Есть идеи как вручную опросить кафку топи c?

1 Ответ

2 голосов
/ 06 февраля 2020

Причин может быть несколько:

  1. Duration.ofMillis(1000) - попробуйте увеличить время, в некоторых случаях 1 может быть слишком низким, если и ваш клиент, и kafka не работают на одном компьютере. Поскольку в документации poll(Duration) говорится Если время ожидания истекло, будет возвращен пустой набор записей
  2. Если вы сначала запустили производителя, а затем потребителя и вы не установили стратегию сброса смещения как можно более раннюю, тогда вы не увидите никаких сообщений, потому что потребитель будет по умолчанию использовать последнее смещение. Поэтому попробуйте установить следующее auto.offset.reset=earliest
  3. Возможно, запущен другой потребитель из той же группы, и что существует только 1 раздел, или группа потребителей уже находится в последнем смещении. В этом случае вы можете попробовать изменить идентификатор группы потребителей.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...