Потребитель Кафки не получает старые сообщения - PullRequest
0 голосов
/ 12 апреля 2020

Клиент Kafka не получает сообщения, созданные до его запуска.

 public class MyKafkaConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final String TOPIC="javaapp";
    private final String BOOTSTRAP_SERVERS="localhost:9092";
    private int receivedCounter=0;
    private ExecutorService executorService=Executors.newFixedThreadPool(1);

    private BlockingQueue<ConsumerRecords<String, String>> queue=new LinkedBlockingQueue<>(500000);
    private MyKafkaConsumer() {
        final Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup6");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumer=new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
    }

    public static void main(String[] args) throws InterruptedException {
        MyKafkaConsumer perfKafkaConsumer=new MyKafkaConsumer();
        perfKafkaConsumer.consumeMessage();
        perfKafkaConsumer.runConsumer();
    }

    private void runConsumer() throws InterruptedException {
        consumer.poll(Duration.ofMillis(1000));
        while (true) {
            final ConsumerRecords<String, String> consumerRecords=consumer.poll(Duration.ofMillis(10000));
            if (!consumerRecords.isEmpty()) {
                System.out.println("Adding result in queue " + queue.size());
                queue.put(consumerRecords);
            }
            consumer.commitAsync();

        }
    }

    private void consumeMessage() {
        System.out.println("Consumer starts at " + Instant.now());
        executorService.submit(() -> {
            while (true) {
                ConsumerRecords<String, String> poll=queue.take();
                poll.forEach(record -> {
                    System.out.println("Received " + ++receivedCounter + " time " + Instant.now(Clock.systemUTC()));
                });

            }
        });
    }
}

ConsumerRecords всегда пусты

Я проверил смещение с помощью инструмента Kafka enter image description here

Я также пробовал с другим именем группы, оно не работает. Та же проблема, т. Е. Опрос возвращает пустые записи enter image description here

Хотя, если я начну своего потребителя раньше, чем производитель, чем он получает сообщения. (Кафка-клиент версия 2.4.1)

1 Ответ

0 голосов
/ 12 апреля 2020

Параметр потребителя auto.offset.reset определяет, где новая группа потребителей начнет потреблять с topi c. По умолчанию для него установлено значение «последний», которое устанавливает смещение групп потребителей на последнее смещение. Вы хотите установить это значение как «самое раннее», если все группы потребителей должны начинаться с самого раннего смещения в топи c.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...