Если вы передвинете new MyCallback(config)
перед циклом for, это, вероятно, поможет.
Тогда у вас есть один экземпляр, в который можно собирать исключения.
Плюс объект обратного вызова будет находиться в области видимости послецикл, и вы можете получить список статусов
Мой скала немного ржавый, но я думаю, что это показывает идею
class MyCallback (config: Map[String, String]) extends Callback {
private val exceptions = _ // some mutable List
def getExceptions() { return exceptions }
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
exceptions.add(exception)
}
}
}
Тогда
val cb = new MyCallback(config)
for (message <- messageList) {
producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), cb)
}
// TODO: check cb.getExceptions().size > 0
Иливы можете попытаться выполнить ранний отказ, проверив внутри цикла, но это будет асинхронно и, вероятно, не будет работать слишком хорошо