Вы, похоже, неправильно понимаете API сервиса Lagom.Если вы пытаетесь материализовать поток из тела вашего сервисного вызова, ваш вызов не будет введен;т. е.
def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
подразумевает, что, когда клиент предоставляет Source[String, NotUsed]
, служба ответит в натуральной форме.Ваш клиент не предоставляет это напрямую;следовательно, ваша подпись, вероятно, должна быть
def stream(): ServiceCall[NotUsed, Source[String, NotUsed]]
Теперь к вашему вопросу ...
Этого на самом деле нет в шаблоне scala giter8, но версия java содержит то, что они называют автономный поток , который делает примерно то, что вы хотите.
В Scala этот код будет выглядеть примерно так ...
override def autonomousStream(): ServiceCall[
Source[String, NotUsed],
Source[String, NotUsed]
] = ServiceCall { hellos => Future {
hellos.mapAsync(8, ...)
}
}
Поскольку ваш вызов несопоставляя поток input , а скорее тему kafka, вы захотите сделать что-то вроде этого:
override def stream(): ServiceCall[NotUsed, Source[String, NotUsed]] = ServiceCall {
_ =>
Future {
kafkaTopic()
.subscribe
.atMostOnce
.mapAsync(...)
}
}