Как использовать данные из темы кафки в конкретном смещении для конкретного смещения? - PullRequest
2 голосов
/ 02 июня 2019

Мне нужно использовать конкретное смещение для конкретного конечного смещения!consumer.seek () считывает данные с определенного смещения, но мне нужно получить данные от смещения к смещению !!Любая помощь будет признательна, спасибо заранее.

    ConsumerRecords<String, String> records = consumer.poll(100);
    if(flag) {
        consumer.seek(new TopicPartition("topic-1", 0), 90);
        flag = false;
    }

1 Ответ

4 голосов
/ 02 июня 2019

Чтобы прочитать сообщения от начального смещения до конечного смещения, сначала необходимо использовать seek(), чтобы переместить потребителя в нужное начальное местоположение, а затем poll(), пока вы не достигнете желаемого конечного смещения.

Например, потреблять со смещения от 100 до 200:

String topic = "test";
TopicPartition tp = new TopicPartition(topic, 0);

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Move to the desired start offset 
            consumer.seek(tp, 100L);
        }
    });
    boolean run = true;
    long lastOffset = 200L;
    while (run) {
        ConsumerRecords<String, String> crs = consumer.poll(Duration.ofMillis(100L));
        for (ConsumerRecord<String, String> record : crs) {
            System.out.println(record);
            if (record.offset() == lastOffset) {
                // Reached the end offsey, stop consuming
                run = false;
                break;
            }
        }
    }
}
...