Akka Stream (Akka Http) - watchTermination вызывается перед использованием всех элементов - PullRequest
0 голосов
/ 08 апреля 2019

Я работаю над асинхронным драйвером scala для neo4j (https://github.com/neotypes/neotypes) и пытаюсь реализовать потоковую передачу результатов запроса для Akka Stream.

Класс типов для Akka Stream выглядит следующим образом:

  type Stream[T] = Source[T, Future[Unit]]

  def init[T](value: () => Future[Option[T]]): AkkaStream.Stream[T] =
    Source
      .repeat()
      .mapAsync[Option[T]](1)(_ => value())
      .takeWhile(_.isDefined)
      .map(_.get)
      .viaMat(Flow[T])((_, _) => Future.successful(()))

У меня также есть следующий крюк завершения:

  override def onComplete[T](s: AkkaStream.Stream[T])(f: => Future[Unit]): AkkaStream.Stream[T] =
    s.watchTermination() {
      (_, done) =>
        done.flatMap(_ => f)
    }

Основная цель ловушки завершения - изящно закрыть транзакцию после прочтения всех элементов. Приведенный выше код работает, как и ожидалось, при использовании простого Sink (рабочий пример https://github.com/neotypes/neotypes/blob/master/akka-stream/src/test/scala/neotypes/akkastreams/AkkaStreamSpec.scala),, однако, когда я пытаюсь выполнить потоковую передачу через Akka Http, watchTermination вызывается преждевременно, что делает транзакцию закрытой в середине процесс.

...