Как мне настроить в Kafka, чтобы он не потреблял там, где он остался? - PullRequest
0 голосов
/ 06 августа 2020

У меня есть потребитель Kafka в Golang. Я не хочу получать информацию с того места, где я ушел в прошлый раз, а хочу получать текущее сообщение. Как мне это сделать?

Ответы [ 3 ]

0 голосов
/ 06 августа 2020

Вы можете установить для enable.auto.commit значение false и auto.offset.reset как последнее значение для идентификатора вашей группы потребителей. Это означает, что kafka не будет автоматически фиксировать ваши смещения.

Если auto commit отключен, прогресс вашей группы потребителей не будет сохранен (если вы не сделаете это вручную). Таким образом, всякий раз, когда потребитель перезапускается по какой-либо причине, он не обнаруживает, что его прогресс сохранен, и сбрасывает на последнее смещение.

0 голосов
/ 08 августа 2020

Apache API-интерфейс потребителя kafka предоставляет метод под названием kafkaConsumer.seekToEnd (), который можно использовать для игнорирования существующих сообщений и использования только сообщений, опубликованных после запуска потребителя, без изменения текущего идентификатора группы потребителя. Ниже приведена реализация того же самого. Программа принимает 3 аргумента: topi c имя, идентификатор группы и диапазон смещения (0 для начала с начала, - 1 для получения сообщений после запуска потребителя, кроме 0 или - 1 будет означать, что потребителю будет потребляться с этого смещения )

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.util.*;

public class Consumer {
    private static Scanner in;

    public static void main(String[] argv)throws Exception{
        if (argv.length != 3) {
            System.err.printf("Usage: %s <topicName> <groupId> <startingOffset>\n",
                    Consumer.class.getSimpleName());
            System.exit(-1);
        }
        in = new Scanner(System.in);

        String topicName = argv[0];
        String groupId = argv[1];
        final long startingOffset = Long.parseLong(argv[2]);

        ConsumerThread consumerThread = new ConsumerThread(topicName,groupId,startingOffset);
        consumerThread.start();
        String line = "";
        while (!line.equals("exit")) {
            line = in.next();
        }
        consumerThread.getKafkaConsumer().wakeup();
        System.out.println("Stopping consumer .....");
        consumerThread.join();

    }

    private static class ConsumerThread extends Thread{
        private String topicName;
        private String groupId;
        private long startingOffset;
        private KafkaConsumer<String,String> kafkaConsumer;

        public ConsumerThread(String topicName, String groupId, long startingOffset){
            this.topicName = topicName;
            this.groupId = groupId;
            this.startingOffset=startingOffset;
        }
        public void run() {
            Properties configProperties = new Properties();
            configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset123");
            configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

            //Figure out where to start processing messages from
            kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
            kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
                }
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
                    Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
                    while(topicPartitionIterator.hasNext()){
                        TopicPartition topicPartition = topicPartitionIterator.next();
                        System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition) );
                        if(startingOffset == -2) {
                            System.out.println("Leaving it alone");
                        }else if(startingOffset ==0){
                            System.out.println("Setting offset to begining");

                            kafkaConsumer.seekToBeginning(topicPartition);
                        }else if(startingOffset == -1){
                            System.out.println("Setting it to the end ");

                            kafkaConsumer.seekToEnd(topicPartition);
                        }else {
                            System.out.println("Resetting offset to " + startingOffset);
                            kafkaConsumer.seek(topicPartition, startingOffset);
                        }
                    }
                }
            });
            //Start processing messages
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println(record.value());
                    }
                    if(startingOffset == -2)
                        kafkaConsumer.commitSync();
                }
            }catch(WakeupException ex){
                System.out.println("Exception caught " + ex.getMessage());
            }finally{
                kafkaConsumer.close();
                System.out.println("After closing KafkaConsumer");
            }
        }
        public KafkaConsumer<String,String> getKafkaConsumer(){
            return this.kafkaConsumer;
        }
    }
}
0 голосов
/ 06 августа 2020

устанавливает новый group.id для вашего потребителя. Затем используйте auto.offset.reset, чтобы определить поведение этой новой группы потребителей, в вашем случае: latest

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...