Как управлять Http в Akka Stream и отправить сообщение в Kafka? - PullRequest
0 голосов
/ 10 октября 2018

Я начинаю с Akka Streams, и я хотел бы создать сервер как Stream, который получает Http.IncomingConnection и отправляет сообщение, полученное на Kafka, как plainSink.

Я объявил свой источник, как указано ниже:

val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
    Http().bind(interface = "localhost", port = "8080")

Затем я хочу извлечь сообщение (String) из тела HttpRequest и, наконец, отправить его String Kafka.Ниже приведен пример потока:

val bindingFuture: Future[Http.ServerBinding] = serverSource
     .map(???) //Here, I need to extract the message
     .map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
     .runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))

Но я не знаю, как извлечь сообщение.Я хотел бы сделать что-то вроде этого:

val requestHandler: HttpRequest => HttpResponse = {
    case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
      HttpResponse(202, entity = "Message sent to Kafka!")
    }
    case r: HttpRequest =>
      r.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
  }

Но, используя connection handleWithSyncHandler requestHandler, я не могу заставить сообщение следовать за процессом потока.И, кроме того, я хотел бы получить любой запрос по /publish URI или вернуть 404 в другом случае внутри потока.

Возможно ли это сделать?

1 Ответ

0 голосов
/ 26 октября 2018

Вместо этого используйте директивы

Маршрутизация DSL будет проще в использовании, чем попытка обработать HttpRequest вручную:

val route : Route = 
  post {
    path("publish") {
      extractRequestEntity { entity =>
        onComplete(entity.toStrict(10.seconds).map(_.data.utf8String){ message =>
          Producer.plainSink(producerSettings)(
            new ProducerRecord[String, String](topic, message.result(2 seconds))
          )
          complete(StatusCodes.OK)
        } 
      }
    }
  }

Теперь его можно передавать для обработки входящих запросов:

Http().bindAndHandle(
  route,
  "localhost",
  8080
)
...