Вы должны контролировать свой поток и перезапустить его, если есть ошибки.Например, вы можете запустить свой поток внутри актера и обрабатывать соединения с ошибками под наблюдением актера.
Ошибки соединений, вероятно, будут длиться в течение пары секунд (возможно, сеть перегружена), поэтому вам следует использовать обратнуюОтключить стратегию, чтобы избежать повторных штормов.
Поток Akka уже дает вам простой способ сделать это для потоков, использующих RestartSource
.См. Обработка ошибок
val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)
val result = RestartSource
.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
) { () =>
Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
// this is a hack to get access to the Consumer.Control
// instances of the latest Kafka Consumer source
.mapMaterializedValue(c => control.set(c))
.via(businessFlow)
}
.runWith(Sink.seq)
control.get().shutdown()
Это решение будет работать только тогда, когда вы запускаете поток, а посредник не работает, потому что тогда потребитель выдает исключение при попытке его создать.Однако, если вам удастся создать своего потребителя, и после этого весь кластер kafka выйдет из строя, внутренняя KafkaConsumer будет использовать упомянутую конфигурацию reconnect.backoff.ms
и reconnect.backoff.max.ms
для повторного подключения, и ваш поток не потерпит неудачу.
Если вы хотитеограничить количество пенсионеров, вы должны сделать следующее
val result: Future[Done] = RestartSource
.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
) { () => // your consumer
}.
.take(3) // retries limit
.runWith(Sink.ignore)
result.onComplete {
case _ => println("Max retries reached")
}