Опрос потребителей Kafka со смещением 0 не возвращает сообщение - PullRequest
0 голосов
/ 12 сентября 2018

Я использую spring-kafka для опроса сообщения, когда я использую аннотацию для потребителя и устанавливаю смещение на 0, оно будет видеть все сообщения от самого раннего.Но когда я пытаюсь использовать внедренный ConsumerFactory для создания потребителя самостоятельно, тогда опрос будет возвращать только несколько сообщений или вообще никаких сообщений.Есть ли какой-то другой конфиг, который мне нужен для того, чтобы можно было вытащить сообщение?Время ожидания опроса уже установлено на 10 секунд.

@Component
public class GenericConsumer {
  private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);

  @Autowired
  ConsumerFactory<String, Record> consumerFactory;

  public ConsumerRecords<String, Record> poll(String topic, String group){
    logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
    Consumer<String, Record> consumer = consumerFactory.createConsumer(group, "");
    consumer.subscribe(Arrays.asList(topic));
    // need to make a dummy poll before we can seek
    consumer.poll(1000);
    consumer.seekToBeginning(consumer.assignment());
    ConsumerRecords<String, Record> records;
    records = consumer.poll(10000);
    logger.info("------------ Total " + records.count() + " records polled");
    consumer.close();
    return records;
  }
}

1 Ответ

0 голосов
/ 12 сентября 2018

У меня нормально работает, это было с загрузкой 2.0.5, Spring Kafka 2.1.10 ...

@SpringBootApplication
public class So52284259Application implements ConsumerAwareRebalanceListener {

    private static final Logger logger = LoggerFactory.getLogger(So52284259Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So52284259Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, GenericConsumer consumer) {
        return args -> {
//          for (int i = 0; i < 1000; i++) { // load up the topic on first run
//              template.send("so52284259", "foo" + i);
//          }
            consumer.poll("so52284259", "generic");
        };
    }

    @KafkaListener(id = "listener", topics = "so52284259")
    public void listen(String in) {
        if ("foo999".equals(in)) {
            logger.info("@KafkaListener: " + in);
        }
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        consumer.seekToBeginning(partitions);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so52284259", 1, (short) 1);
    }

}

@Component
class GenericConsumer {

    private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);

    @Autowired
    ConsumerFactory<String, String> consumerFactory;

    public void poll(String topic, String group) {
        logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
        Consumer<String, String> consumer = consumerFactory.createConsumer(group, "");
        consumer.subscribe(Arrays.asList(topic));
        // need to make a dummy poll before we can seek
        consumer.poll(1000);
        consumer.seekToBeginning(consumer.assignment());
        ConsumerRecords<String, String> records;
        boolean done = false;
        while (!done) {
            records = consumer.poll(10000);
            logger.info("------------ Total " + records.count() + " records polled");
            Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
            while (iterator.hasNext()) {
                String value = iterator.next().value();
                if ("foo999".equals(value)) {
                    logger.info("Consumer: " + value);
                    done = true;
                }
            }
        }
        consumer.close();
    }

}

и

2018-09-12 09:35:25.929  INFO 61390 --- [           main] com.example.GenericConsumer              : ------------ Total 500 records polled
2018-09-12 09:35:25.931  INFO 61390 --- [           main] com.example.GenericConsumer              : ------------ Total 500 records polled
2018-09-12 09:35:25.932  INFO 61390 --- [           main] com.example.GenericConsumer              : Consumer: foo999
2018-09-12 09:35:25.942  INFO 61390 --- [ listener-0-C-1] com.example.So52284259Application        : @KafkaListener: foo999
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...