Scala: Как определить статус всех нескольких сообщений, отправленных через Kafka асинхронно? - PullRequest
0 голосов
/ 06 апреля 2019

Я использую Scala для публикации сообщений в Kafka. У меня есть список сообщений для публикации на kafka, возможно, асинхронно. Однако, в конце, если есть какая-либо ошибка в публикации хотя бы одного сообщения, я должен зафиксировать это и принять меры. Я знаю о методе обратного вызова, но он будет работать по одному на сообщение. Я хочу накопить или выяснить способ всех таких статусов и решить, если ничего не вышло. Что-то вроде статуса партии.

for (message <- messageList) {
    // Create a message
    // producer is created
    // I have a separate util package, having the MyCallback class with overridden onCompletion method.
    producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), new MyCallback(config))

}
// here, I need help, to see if any of the messages sent is failed or not.
// How to capture the statuses of callback here for entire batch?


// Separate class file
class MyCallback (config: Map[String, String]) extends Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
     // if successful meta data, do something
     // if exception, I can log the error.

    }
}

Обычно я могу проверить состояние отдельных сообщений в методе класса обратного вызова.

Как отслеживать эти статусы после завершения цикла for и предпринимать дальнейшие действия на этом основании?

1 Ответ

0 голосов
/ 06 апреля 2019

Если вы передвинете new MyCallback(config) перед циклом for, это, вероятно, поможет.
Тогда у вас есть один экземпляр, в который можно собирать исключения.
Плюс объект обратного вызова будет находиться в области видимости послецикл, и вы можете получить список статусов

Мой скала немного ржавый, но я думаю, что это показывает идею

class MyCallback (config: Map[String, String]) extends Callback {

    private val exceptions = _ // some mutable List

    def getExceptions() { return exceptions }

    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
     if (exception != null) {
         exceptions.add(exception)
     }
    }
}

Тогда

val cb = new MyCallback(config)
for (message <- messageList) {
    producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), cb)
}

// TODO: check cb.getExceptions().size > 0

Иливы можете попытаться выполнить ранний отказ, проверив внутри цикла, но это будет асинхронно и, вероятно, не будет работать слишком хорошо

...