Есть ли способ потреблять все сообщения из темы кафки и прекращать опрос после этого? - PullRequest
0 голосов
/ 29 ноября 2018

У меня 1000 сообщений в моей теме. Мне нужно написать kafka потребителя в scala, чтобы просто получить 1000 сообщений, чтобы я мог начать обработку 1000 сообщений.

 var recordList = new ListBuffer[ConsumerRecord[String, String]]()
   while (true) {
     val records: ConsumerRecords[String, String] = consumer.poll(100)
     records.asScala.foreach(record => recordList += record) 
     recordList.toList
     }

Но чтопроисходит, когда цикл никогда не заканчивается, и я получаю следующие сообщения в журнале.

Fetch READ_UNCOMMITTED at offset 1000 for partition test-0 returned fetch data (error=NONE, highWaterMark=1000, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=10486)

Added READ_UNCOMMITTED fetch request for partition test-0 at offset 1000 to node localhost:9092 (id: 0 rack: null)

Sending READ_UNCOMMITTED fetch for partitions [test-0] to broker localhost:9092 (id: 0 rack: null)

Ответы [ 2 ]

0 голосов
/ 29 ноября 2018

Другим способом было бы закрыть потребителя, когда records.size () равен нулю

0 голосов
/ 29 ноября 2018

Почему бы вам не выйти, когда records.size () равен нулю?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...