Сначала я этого не осознавал, но, поскольку мы получаем все поля из одного непрерывного потока, мы не можем извлечь одно из полей для последующего использования в потоке с помощью Source[T]
, хотя akka-streams позволяют нам это делать.
Таким образом, каждая часть составного запроса должна быть очищена перед обработкой следующего.
Также обратите внимание, что следующая функция будет собирать только текстовые поля, предшествующие двоичному файлу.
def fileUploadWithFields(fieldName: String): Directive1[(Map[String, String], FileInfo, Source[ByteString, Any])] =
entity(as[Multipart.FormData]).flatMap { formData ⇒
extractRequestContext.flatMap { ctx ⇒
implicit val mat: Materializer = ctx.materializer
// Because it's continuous stream of fields we MUST consume each field before switching to next one. [https://stackoverflow.com/q/52765993/226895]
val fut = formData.parts
.takeWhile(part ⇒ !(part.filename.isDefined && part.name == fieldName), inclusive = true)
.fold((Map.empty[String, String], Option.empty[(FileInfo, Source[ByteString, Any])])) { case ((fields, pairOpt), part) ⇒
if (part.filename.nonEmpty && part.name == fieldName) {
//println(s"Got file field: $part")
fields → Some((FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
} else if (part.filename.isEmpty && part.entity.contentType.mediaType.isText && part.entity.isInstanceOf[HttpEntity.Strict]) {
//println(s"Got text field: $part")
val text = part.entity.asInstanceOf[HttpEntity.Strict].data.utf8String
fields.updated(part.name, text) → pairOpt
} else {
//println(s"IGNORING field: $part")
part.entity.discardBytes()
fields → pairOpt
}
}
.collect {
case (fields, Some((info, stream))) ⇒
//println(s"Completed scanning fields: ${(fields, info, stream)}")
(fields, info, stream)
}
.runWith(Sink.headOption[(Map[String, String], FileInfo, Source[ByteString, Any])])
onSuccess(fut)
}
}.flatMap {
case Some(tuple) ⇒ provide(tuple)
case None ⇒ reject(MissingFormFieldRejection(fieldName))
}