Создать исходный код от Lagom / Akka Kafka Subscriber для Websocket - PullRequest
0 голосов
/ 13 сентября 2018

Я хочу, чтобы моя служба только для подписчиков Lagom подписывалась на тему Kafka и передавала сообщения в веб-сокет.У меня есть служба, определенная следующим образом с использованием этой документации (https://www.lagomframework.com/documentation/1.4.x/scala/MessageBrokerApi.html#Subscribe-to-a-topic) в качестве руководства:

    // service call
    def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

    // service implementation
    override def stream() = ServiceCall { req =>
      req.runForeach(str => log.info(s"client: %str"))
      kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
        // add message to a Source and return Done
      ))
      Future.successful(//some Source[String, NotUsed])

Однако я не могу понять, как обработать мое сообщение kafka. Flow.fromFunctionвозвращает [String, Done, _] и подразумевает, что мне нужно добавить эти сообщения (строки) в источник, созданный вне подписчика.

Итак, мой вопрос состоит из двух частей: 1) Как создать источник потока akkaполучать сообщения от подписчика темы кафки во время выполнения?2) Как добавить сообщения kafka к указанному источнику, находясь в потоке?

1 Ответ

0 голосов
/ 14 сентября 2018

Вы, похоже, неправильно понимаете API сервиса Lagom.Если вы пытаетесь материализовать поток из тела вашего сервисного вызова, ваш вызов не будет введен;т. е.

def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

подразумевает, что, когда клиент предоставляет Source[String, NotUsed], служба ответит в натуральной форме.Ваш клиент не предоставляет это напрямую;следовательно, ваша подпись, вероятно, должна быть

def stream(): ServiceCall[NotUsed, Source[String, NotUsed]]

Теперь к вашему вопросу ...

Этого на самом деле нет в шаблоне scala giter8, но версия java содержит то, что они называют автономный поток , который делает примерно то, что вы хотите.

В Scala этот код будет выглядеть примерно так ...

override def autonomousStream(): ServiceCall[
  Source[String, NotUsed], 
  Source[String, NotUsed]
] = ServiceCall { hellos => Future {
    hellos.mapAsync(8, ...)
  }
}

Поскольку ваш вызов несопоставляя поток input , а скорее тему kafka, вы захотите сделать что-то вроде этого:

override def stream(): ServiceCall[NotUsed, Source[String, NotUsed]] = ServiceCall { 
  _ => 
    Future {
      kafkaTopic()
        .subscribe
        .atMostOnce
        .mapAsync(...)
    }
}
...