Я использую Alpakka для разбора CSV-файлов.версия "com.lightbend.akka" %% "akka-stream-alpakka-csv"% 0,20 У меня есть CSV-файл с незамкнутой кавычкой.
email
test@emample.com
"test@emample.com
test@emample.com
test@emample.com
Я хочу пропустить плохие строки и перейти к следующему, но мой поток падает.
Я использую наблюдениеStrategy Supervision.Resume, но он не работает.
Поток не удается найти незакрытой кавычки.
Есть ли способ исправить это?
мой код:
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
def hdfsSource(csv: String): Source[ByteString, Future[IOResult]] =
Source
.single(csv)
.map(ByteString.apply)
.mapMaterializedValue(_ => Future.successful(IOResult(1, Success(Done))))
val csv = """email,country,name
|"test,test,test
|test,test,test
|test,test,test
|""".stripMargin
val source = hdfsSource(csv)
val decider: Supervision.Decider = {
case _ ⇒ Supervision.Resume
}
val result = source
.via(CsvParsing.lineScanner())
.via(CsvToMap.toMapAsStrings())
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runForeach(println)