Получить последнее вставленное сообщение из темы кафки - PullRequest
1 голос
/ 09 июля 2019

У меня есть требование, где мне нужно найти недавно вставленное сообщение из темы Кафки. Как мне этого добиться?

Я пытался сначала получить смещение и пытаться получить сообщения с этим смещением? Это эффективное решение?

val config = KafkaConfig()
  val props = new Properties()
//  ConsumerConfig
  props.put("bootstrap.servers", config.bootstrapServers)
  props.put("group.id", "stream-latest-consumer")
  props.put(
    "key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer"
  )
  props.put(
    "value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer"
  )
  val kafkaConsumer = new KafkaConsumer[String, String](props)

  val p = new TopicPartition(config.topic, 0)
  val cl: util.Collection[TopicPartition] = List(p).asJava
  val offsetsMap: java.util.Map[TopicPartition, java.lang.Long] =
    kafkaConsumer.endOffsets(cl)

  val offsetCount = offsetsMap.get(p)

1 Ответ

1 голос
/ 09 июля 2019

Вы также можете использовать

void seekToEnd(Collection<TopicPartition> partitions)

для получения последнего смещения для заданных разделов.

...