Чтение сообщений из Kafka topi c в диапазоне смещений - PullRequest
0 голосов
/ 06 марта 2020


Я ищу способ использовать некоторый набор сообщений из моего Kafka topi c с указанным c диапазоном смещения (предположим, что мой раздел имеет смещение 200–300, я хочу использовать сообщения от смещение 250-270).

Я использую код ниже, где я могу указать начальное смещение, но оно будет использовать все сообщения от 250 до конца. Есть ли способ / атрибуты, доступные для установки конечного смещения, чтобы использовать сообщения до этой точки.

    @KafkaListener(id = "KafkaListener",
        topics = "${kafka.topic.name}", 
        containerFactory = "kafkaManualAckListenerContainerFactory", 
        errorHandler = "${kafka.error.handler}",
        topicPartitions = @TopicPartition(topic = "${kafka.topic.name}",
        partitionOffsets = {
                @PartitionOffset(partition = "0", initialOffset = "250"), 
                @PartitionOffset(partition = "1", initialOffset = "250")
        }))

Ответы [ 2 ]

0 голосов
/ 06 марта 2020

Вы можете использовать seek(), чтобы заставить потребителя начать потреблять с указанного c смещения и затем poll(), пока не достигнете конечного конечного смещения.

public void seek(TopicPartition partition, long offset)

Переопределяет смещения выборки, которые потребитель будет использовать в следующем poll(timeout). Если этот API вызывается для одного и того же раздела более одного раза, последнее смещение будет использовано для следующего poll(). Обратите внимание, что вы можете потерять данные, если этот API произвольно используется в середине потребления, чтобы сбросить смещения выборки


Например, предположим, что вы хотите начать со смещения 200:

TopicPartition tp = new TopicPartition("myTopic", 0);
Long startOffset = 200L
Long endOffset = 300L

List<TopicPartition> topics = Arrays.asList(tp);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);

теперь вам просто нужно сохранять poll() ing до достижения endOffset:

boolean run = true;
while (run) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records) {

        // Do whatever you want to do with `record`

        // Check if end offset has been reached
        if (record.offset() == endOffset) {
            run = false;
            break;
        }
    }
}
0 голосов
/ 06 марта 2020
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

boolean keepOnReading = true;

// offset to read the data from.
long offsetToReadFrom = 250L; 

// seek is mostly used to replay data or fetch a specific message
// seek
kafkaConsumer.seek(partitionToReadFrom, offsetToReadFrom);

while(keepOnReading) {
   ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));

   for (ConsumerRecord<String, String> record : records) {
      numberOfMessagesRead ++;
      logger.info("Key: "+record.key() + ", Value: " + record.value());
      logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());

      if(record.offset() == 270L) {
        keepOnReading = false;
        break;
      }
   }
}

Надеюсь, это поможет вам !!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...