Пользовательская версия директивы fileUpload не может быть реализована - PullRequest
0 голосов
/ 11 октября 2018

Когда пользователь загружает файл в мой веб-сервис, я хотел бы собрать недвоичные поля из запроса POST.Они содержат метаданные загруженного файла.Поэтому я изменил директиву fileUpload akka-http для этой

def fileUpload3(fieldName: String): Directive1[(Map[String, String], FileInfo, Source[ByteString, Any])] =
  entity(as[Multipart.FormData]).flatMap { formData ⇒
    extractRequestContext.flatMap { ctx ⇒
      implicit val mat: Materializer = ctx.materializer

      val fut =
        formData.parts.fold((Map.empty[String, String], Option.empty[(FileInfo, Source[ByteString, Any])])) { case ((fields, pairOpt), part) ⇒
          if (part.filename.nonEmpty && part.name == fieldName) {
            fields → Some((FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
          } else if (part.filename.isEmpty && part.entity.contentType.mediaType == MediaTypes.`text/plain` && part.entity.isInstanceOf[HttpEntity.Strict]) {
            fields.updated(part.name, part.entity.asInstanceOf[HttpEntity.Strict].data.utf8String) → pairOpt
          } else {
            fields → pairOpt
          }
        }
          .collect {
            case (fields, Some((info, stream))) ⇒
              (fields, info, stream)
          }
          .runWith(Sink.headOption[(Map[String, String], FileInfo, Source[ByteString, Any])])

      onSuccess(fut)
    }
  }.flatMap {
    case Some(tuple) ⇒ provide(tuple)
    case None ⇒ reject(MissingFormFieldRejection(fieldName))
  }

Хотя я не вижу большой разницы по сравнению с исходным кодом , для меня это не получается, если я использую его со следующим исключением:

akka.stream.AbruptIOTerminationException: Stream terminated without completing IO operation.
Caused by: akka.stream.impl.SubscriptionTimeoutException: Substream Source has not been materialized in 5000 milliseconds
    at akka.stream.impl.fusing.SubSource.timeout(StreamOfStreams.scala:746)

Чего мне не хватает, ребята?

1 Ответ

0 голосов
/ 13 октября 2018

Сначала я этого не осознавал, но, поскольку мы получаем все поля из одного непрерывного потока, мы не можем извлечь одно из полей для последующего использования в потоке с помощью Source[T], хотя akka-streams позволяют нам это делать.

Таким образом, каждая часть составного запроса должна быть очищена перед обработкой следующего.

Также обратите внимание, что следующая функция будет собирать только текстовые поля, предшествующие двоичному файлу.

def fileUploadWithFields(fieldName: String): Directive1[(Map[String, String], FileInfo, Source[ByteString, Any])] =
  entity(as[Multipart.FormData]).flatMap { formData ⇒
    extractRequestContext.flatMap { ctx ⇒
      implicit val mat: Materializer = ctx.materializer

      // Because it's continuous stream of fields we MUST consume each field before switching to next one. [https://stackoverflow.com/q/52765993/226895]
      val fut = formData.parts
        .takeWhile(part ⇒ !(part.filename.isDefined && part.name == fieldName), inclusive = true)
        .fold((Map.empty[String, String], Option.empty[(FileInfo, Source[ByteString, Any])])) { case ((fields, pairOpt), part) ⇒
          if (part.filename.nonEmpty && part.name == fieldName) {
            //println(s"Got file field: $part")
            fields → Some((FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
          } else if (part.filename.isEmpty && part.entity.contentType.mediaType.isText && part.entity.isInstanceOf[HttpEntity.Strict]) {
            //println(s"Got text field: $part")
            val text = part.entity.asInstanceOf[HttpEntity.Strict].data.utf8String
            fields.updated(part.name, text) → pairOpt
          } else {
            //println(s"IGNORING field: $part")
            part.entity.discardBytes()
            fields → pairOpt
          }
        }
        .collect {
          case (fields, Some((info, stream))) ⇒
            //println(s"Completed scanning fields: ${(fields, info, stream)}")
            (fields, info, stream)
        }
        .runWith(Sink.headOption[(Map[String, String], FileInfo, Source[ByteString, Any])])

      onSuccess(fut)
    }
  }.flatMap {
    case Some(tuple) ⇒ provide(tuple)
    case None ⇒ reject(MissingFormFieldRejection(fieldName))
  }
...