Я тестирую async send () в моем производителе кафки.
Кластер, к которому я хочу подключиться, находится в автономном режиме.
Я предполагаю, что я быстро отправлю 10000 индивидуальных запросов (длина listToSend).
Затем наступит тайм-аут (60 с), и через 60 секунд я увижу, что обратные вызовы поразили меня logger.error(s"failed to send record ${x._2}", e)
Однако, похоже, что для завершения метода потребуется вечность.
Вот почему я добавил в строку logger.debug("test: am I sending data")
.
Он печатает, затем ничего не происходит в течение 60 секунд. Я вижу неудачный обратный вызов для 1-й записи. И только тогда он будет двигаться дальше.
Это нормальное поведение или я упускаю что-то фундаментальное?
listToSend.foreach { x =>
logger.debug("test: am I sending data")
// note: I added this 'val future =' in an attempt to fix this, to no avail
val future = producer.send(new ProducerRecord[String, String](topic, x._2), new Callback {
override def onCompletion(metadata: RecordMetadata, e: Exception) {
if (e != null) {
//todo: handle failed sends, timeouts, ...
logger.error(s"failed to send record ${x._2}", e)
}
else { //nice to have: implement logic here, or call another method to process metadata
logger.debug("~Callback success~")
}
}
}
)
}
примечание: Я не хочу блокировать этот код, я хочу сохранить его асинхронным. Тем не менее, похоже, что он блокирует send () независимо.