Я начинаю с Akka Streams, и я хотел бы создать сервер как Stream, который получает Http.IncomingConnection
и отправляет сообщение, полученное на Kafka, как plainSink.
Я объявил свой источник, как указано ниже:
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "localhost", port = "8080")
Затем я хочу извлечь сообщение (String) из тела HttpRequest и, наконец, отправить его String Kafka.Ниже приведен пример потока:
val bindingFuture: Future[Http.ServerBinding] = serverSource
.map(???) //Here, I need to extract the message
.map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
.runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))
Но я не знаю, как извлечь сообщение.Я хотел бы сделать что-то вроде этого:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
HttpResponse(202, entity = "Message sent to Kafka!")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
Но, используя connection handleWithSyncHandler requestHandler
, я не могу заставить сообщение следовать за процессом потока.И, кроме того, я хотел бы получить любой запрос по /publish
URI или вернуть 404 в другом случае внутри потока.
Возможно ли это сделать?