У меня есть сценарий от производителя консоли, я создал 4 сообщения, и с помощью spark я смог получить эти 4 сообщения с помощью метода spark.read (). Теперь сценарий такой: когда я создаю еще 3 сообщения, я просто хочу получить последние 3 сообщения, а не эти 4 сообщения, используя spark.read (). Данные не поступают в виде потока.
spark.read () имеет параметры "initialOffset" и "endOffset", поэтому я подумал, что если я смогу получить смещение последнего сообщения из темы, тогда оно подойдет.
Я попробовал пару вещей:
- Создав второго потребителя и подписав его на это сообщение, позже используйте consumer.commited (topicAndPartition) для получения метаданных, а затем смещения от этих метаданных.
Проблемы с этим подходом:
Он работает, однако, метод consumer.commited () возвращает значение null, и из-за этого consumer.commited (topicAndPartition) .offset (). ToLong возвращает исключение NullPointerException.
Вот код.
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("group.id","12238868")
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.put("enable.auto.commit","true")
val consumer = new KafkaConsumer[String, String](properties)
val topic = Array("MessagePassingTopic")
val topicAndPartition = new org.apache.kafka.common.TopicPartition("MessagePassingTopic", 0)
consumer.subscribe(util.Arrays.asList("MessagePassingTopic"))
val offsetAndMetadata = consumer.committed(topicAndPartition)
val endOffset = offsetAndMetadata.offset().toLong
Я попытался извлечь записи с помощью .poll () и получить смещение последнего сообщения, но список записей отображается пустым, однако это не так. У меня есть сообщения в этой теме, и я проверил с помощью консоли. Код выглядит примерно так.
val recordsFromConsumer = consumer.poll(10000)
val recordsFromConsumerList = recordsFromConsumer.records("MessagePassingTopic").toList
val lastOffset = recordsFromConsumerList.last.offset()```
Проблемы с этим подходом:
recordsFromConsumerList не возвращает никаких записей. Таким образом, нет шансов получить последнюю запись, и поэтому .last.offset () тоже не работает.
Если есть какой-то лучший способ выполнить эту задачу или что-то, что я делаю неправильно, пожалуйста, дайте мне знать. Любая помощь приветствуется.