Принудительно заставляет потребителя alpakka kafka показывать сообщение об ошибке десериализации - PullRequest
0 голосов
/ 14 июня 2019

Потребитель Alpakka kafka обрабатывает записи до тех пор, пока не обнаружит запись, которую он не может десериализовать, и тихо умирает, не оставляя сообщения об ошибке. Как заставить его сообщить об ошибке?

1 Ответ

0 голосов
/ 14 июня 2019

В соответствии с документацией ,

При сбое потока поток данных обрабатывает все внутренние ресурсы библиотеки.

(де) сериализация

Если чтение из-за сбоя Kafka вызвано другими причинами, такими как проблемы десериализации, этап немедленно завершится неудачей.Если вы ожидаете таких случаев, рассмотрите возможность использования необработанных байтовых массивов и десериализации на следующем этапе карты, где вы можете использовать наблюдение для пропуска неудачных элементов.

Рекомендуется (де) сериализовать сообщения с использованием байтовых массивов в качестве значения и выполнять (де) сериализацию в операции отображения в потоке Акка вместо того, чтобы реализовывать ее непосредственно в Кафке (де) сериализаторы.

Пример с Spray JSON (Этот пример использует возобновление для реагирования на данные, которые не могут быть правильно проанализированы, и игнорирует неисправные элементы):

import spray.json._

final case class SampleData(name: String, value: Int)

object SampleDataSprayProtocol extends DefaultJsonProtocol {
  implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData)
}

import SampleDataSprayProtocol._

    val resumeOnParsingException = ActorAttributes.withSupervisionStrategy {
      new akka.japi.function.Function[Throwable, Supervision.Directive] {
        override def apply(t: Throwable): Supervision.Directive = t match {
          case _: spray.json.JsonParser.ParsingException => Supervision.Resume
          case _ => Supervision.stop
        }
      }
    }

    val consumer = Consumer
      .plainSource(consumerSettings, Subscriptions.topics(topic))
      .map { consumerRecord =>
        val value = consumerRecord.value()
        val sampleData = value.parseJson.convertTo[SampleData]
        sampleData
      }
      .withAttributes(resumeOnParsingException)
      .toMat(Sink.seq)(Keep.both)
      .mapMaterializedValue(DrainingControl.apply)
      .run()
...