Я работаю над асинхронным драйвером scala для neo4j (https://github.com/neotypes/neotypes) и пытаюсь реализовать потоковую передачу результатов запроса для Akka Stream.
Класс типов для Akka Stream выглядит следующим образом:
type Stream[T] = Source[T, Future[Unit]]
def init[T](value: () => Future[Option[T]]): AkkaStream.Stream[T] =
Source
.repeat()
.mapAsync[Option[T]](1)(_ => value())
.takeWhile(_.isDefined)
.map(_.get)
.viaMat(Flow[T])((_, _) => Future.successful(()))
У меня также есть следующий крюк завершения:
override def onComplete[T](s: AkkaStream.Stream[T])(f: => Future[Unit]): AkkaStream.Stream[T] =
s.watchTermination() {
(_, done) =>
done.flatMap(_ => f)
}
Основная цель ловушки завершения - изящно закрыть транзакцию после прочтения всех элементов. Приведенный выше код работает, как и ожидалось, при использовании простого Sink
(рабочий пример https://github.com/neotypes/neotypes/blob/master/akka-stream/src/test/scala/neotypes/akkastreams/AkkaStreamSpec.scala),, однако, когда я пытаюсь выполнить потоковую передачу через Akka Http, watchTermination
вызывается преждевременно, что делает транзакцию закрытой в середине процесс.