akka stream alpakka csv: пропустить исключение и проанализировать следующие строки - PullRequest
0 голосов
/ 29 ноября 2018

Я использую Alpakka для разбора CSV-файлов.версия "com.lightbend.akka" %% "akka-stream-alpakka-csv"% 0,20 У меня есть CSV-файл с незамкнутой кавычкой.

email
test@emample.com
"test@emample.com
test@emample.com
test@emample.com

Я хочу пропустить плохие строки и перейти к следующему, но мой поток падает.

Я использую наблюдениеStrategy Supervision.Resume, но он не работает.

Поток не удается найти незакрытой кавычки.

Есть ли способ исправить это?

мой код:

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

def hdfsSource(csv: String): Source[ByteString, Future[IOResult]] =
  Source
    .single(csv)
    .map(ByteString.apply)
    .mapMaterializedValue(_ => Future.successful(IOResult(1, Success(Done))))

val csv = """email,country,name
            |"test,test,test
            |test,test,test
            |test,test,test
            |""".stripMargin

val source = hdfsSource(csv)

val decider: Supervision.Decider = {
  case _ ⇒ Supervision.Resume
}

val result = source
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMapAsStrings())
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runForeach(println)

1 Ответ

0 голосов
/ 30 ноября 2018

В настоящее время CsvParsing.lineScanner() не поддерживает стратегии контроля.Вы можете выбрать другой символ в качестве символа кавычки для сканера строки CsvParsing.lineScanner(quoteChar = '\'').Тогда вы получите закрытую двойную кавычку как часть проанализированных результатов:

Map(email -> "test, country -> test, name -> test) Map(email -> test, country -> test, name -> test) Map(email -> test, country -> test, name -> test)

...