В соответствии с документацией ,
При сбое потока поток данных обрабатывает все внутренние ресурсы библиотеки.
(де) сериализация
Если чтение из-за сбоя Kafka вызвано другими причинами, такими как проблемы десериализации, этап немедленно завершится неудачей.Если вы ожидаете таких случаев, рассмотрите возможность использования необработанных байтовых массивов и десериализации на следующем этапе карты, где вы можете использовать наблюдение для пропуска неудачных элементов.
Рекомендуется (де) сериализовать сообщения с использованием байтовых массивов в качестве значения и выполнять (де) сериализацию в операции отображения в потоке Акка вместо того, чтобы реализовывать ее непосредственно в Кафке (де) сериализаторы.
Пример с Spray JSON (Этот пример использует возобновление для реагирования на данные, которые не могут быть правильно проанализированы, и игнорирует неисправные элементы):
import spray.json._
final case class SampleData(name: String, value: Int)
object SampleDataSprayProtocol extends DefaultJsonProtocol {
implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData)
}
import SampleDataSprayProtocol._
val resumeOnParsingException = ActorAttributes.withSupervisionStrategy {
new akka.japi.function.Function[Throwable, Supervision.Directive] {
override def apply(t: Throwable): Supervision.Directive = t match {
case _: spray.json.JsonParser.ParsingException => Supervision.Resume
case _ => Supervision.stop
}
}
}
val consumer = Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
.map { consumerRecord =>
val value = consumerRecord.value()
val sampleData = value.parseJson.convertTo[SampleData]
sampleData
}
.withAttributes(resumeOnParsingException)
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()