Кафка читает все сообщения темы - PullRequest
0 голосов
/ 27 ноября 2018

Я хотел бы прочитать все сообщения из темы Кафки в запланированный интервал для вычисления некоторого значения глобального индекса.Я делаю что-то вроде этого:

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("group.id", "test")
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,Int.MaxValue.toString)

  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(util.Collections.singletonList(TOPIC))
  consumer.poll(10000)
  consumer.seekToBeginning(consumer.assignment())
   val records = consumer.poll(10000)

с этим механизмом, я получаю все записи, но это эффективный способ сделать это?Это будет около 20000000 (2,1 ГБ) записей на тему.

1 Ответ

0 голосов
/ 27 ноября 2018

Возможно, вы решили использовать библиотеку Kafka Streams для этого.Поддерживаются разные типы окон.

  1. Падающее временное окно
  2. Прыгающее временное окно
  3. Скользящее временное окно
  4. Сеансовое окно

Вы можете использовать акробатические окна для захвата событий в заданном внутреннем и расчета вашего глобального индекса.

https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#windowing

...