У меня 1000 сообщений в моей теме. Мне нужно написать kafka
потребителя в scala
, чтобы просто получить 1000 сообщений, чтобы я мог начать обработку 1000 сообщений.
var recordList = new ListBuffer[ConsumerRecord[String, String]]()
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(100)
records.asScala.foreach(record => recordList += record)
recordList.toList
}
Но чтопроисходит, когда цикл никогда не заканчивается, и я получаю следующие сообщения в журнале.
Fetch READ_UNCOMMITTED at offset 1000 for partition test-0 returned fetch data (error=NONE, highWaterMark=1000, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=10486)
Added READ_UNCOMMITTED fetch request for partition test-0 at offset 1000 to node localhost:9092 (id: 0 rack: null)
Sending READ_UNCOMMITTED fetch for partitions [test-0] to broker localhost:9092 (id: 0 rack: null)