Java-код: получить последние n сообщений от потребителя кафки с определенной темой - PullRequest
1 голос
/ 18 апреля 2019

kafka версия: 0.9.0.1

Если n = 20, я должен получить последние 20 сообщений по теме.

сейчас я использую

kafkaConsumer.seekToBeginning();

Я получаю все сообщения.Я пишу некоторую логику, чтобы получить последние 20.

В моей теме могут быть сотни тысяч записей

public List<JSONObject> consumeMessages(String kafkaTopicName) {
  KafkaConsumer<String, String> kafkaConsumer = null;
  boolean flag = true;
  List<JSONObject> messagesFromKafka = new ArrayList<>();
  int recordCount = 0;
  int i = 0;
  int maxMessagesToReturn = 20;

  Properties props = new Properties();         
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "project.group.id");
  props.put("max.partition.fetch.bytes", "1048576000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  kafkaConsumer = new KafkaConsumer<>(props);

  kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
  TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
  LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
  while (flag) {
    // will consume all the messages and store in records
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);

    kafkaConsumer.seekToBeginning(topicPartition);

    // getting total records count
    recordCount = records.count();
    LOGGER.info("recordCount " + recordCount);
    for (ConsumerRecord<String, String> record : records) {
      if(record.value() != null) {
        if (i >= recordCount - maxMessagesToReturn) {
          // adding last 20 messages to messagesFromKafka
          LOGGER.info("kafkaMessage "+record.value());
          messagesFromKafka.add(new JSONObject(record.value()));
        }
        i++;
      }
    }
    if (recordCount > 0) {
      flag = false;
    }
  }
  kafkaConsumer.close();
  return messagesFromKafka;
}

1 Ответ

1 голос
/ 18 апреля 2019

Вы можете использовать kafkaConsumer.seekToEnd(Collection<TopicPartition> partitions) для поиска последнего смещения данного раздела (ов).Согласно документации:

"Поиск последнего смещения для каждого из заданных разделов. Эта функция выполняет ленивый поиск, стремясь к окончательному смещению во всех разделах, только когда poll(Duration) или position(TopicPartition)Если разделы не предоставлены, ищите окончательное смещение для всех назначенных в данный момент разделов. "

Затем вы можете получить положение определенного раздела, используя position(TopicPartition partition).

Затем вы можете уменьшить его на 20 и использовать kafkaConsumer.seek(TopicPartition partition, long offset) для получения последних 20 сообщений.

Просто,

kafkaConsumer.seekToEnd(partitionList);
long endPosition = kafkaConsumer.position(topicPartiton);
long recentMessagesStartPosition = endPosition - maxMessagesToReturn;
kafkaConsumer.seek(topicPartition, recentMessagesStartPosition);

Теперь вы можете получить самые последние 20 сообщений.использование poll()

Это простая логика, но если у вас есть несколько разделов, вы должны рассмотреть и эти случаи.Я не пробовал это, но надеюсь, вы получите концепцию.

...