Объединение потоков в потоках Акка - PullRequest
0 голосов
/ 18 октября 2018

Мы используем Kafka и надеемся использовать интерактивные запросы , чтобы получить доступ к данным в наших государственных магазинах.У нас есть существующий сервис, который использует Akka HTTP для обслуживания REST API, и мы хотели интегрировать интерактивные запросы в поток.

Казалось, что kafka-streams-query будетидеально подходит для этого.Тем не менее, он интегрируется в Akka HTTP, предоставляя свойство route, которое использует низкоуровневый API, который сопоставляется с Flow[HttpRequest, HttpResponse, Any].Весь наш предыдущий код объединяет код с использованием DSL-маршрутизации Akka HTTP.

Я ожидаю, что код, подобный следующему, будет работать, но это не так:

implicit val system:ActorSystem = ActorSystem("web")
implicit val materializer:ActorMaterializer = ActorMaterializer()
implicit val ec = system.dispatcher

val firstRoutes:Route = ... //a route object populated
val lastRoutes:Route = ... //other route object populad

val iqServiceFlow:Flow[HttpRequest, HttpResponse, Any] = ...// code that returns the interactive query service

val firstFlow = Route.handlerFlow(firstRoutes)
val lastFlow = Route.handlerFlow(lastRoutes)

// The following code doesn't work though everything I've seen online suggests it should
val handler = firstFlow via iqServiceFlow via lastFlow

Http().bindAndHandle(handler, "0.0.0.0", 8000)

Как объединить потоки вAkka Streams?

Редактировать : исправлен оператор назначения обработчика.

1 Ответ

0 голосов
/ 18 октября 2018

Для ясности давайте начнем с явного указания всех возвращаемых типов:

val iqServiceFlow: Flow[HttpRequest, HttpResponse, Any] = ...
val firstFlow: Flow[HttpRequest, HttpResponse, NotUsed] = Route.handlerFlow(firstRoutes)
val lastFlow: Flow[HttpRequest, HttpResponse, NotUsed]  = Route.handlerFlow(lastRoutes)

Кроме того, вместо ...

val handler = firstRoutes via iqServiceFlow via lastFlow

... вы, вероятно, имели в виду:

val handler = firstFlow via iqServiceFlow via lastFlow

Чтобы объединить потоки вместе с via, типы входа и выхода должны совпадать: то есть тип вывода первого потока должен совпадать с типом ввода второго потока, искоро.То, что вы пытаетесь сделать со своим обработчиком, таково:

[HttpRequest, HttpResponse] // firstFlow
                   |
                   v
             [HttpRequest, HttpResponse] // iqServiceFlow
                                |
                                v
                          [HttpRequest, HttpResponse] // lastFlow

Тип вывода всех ваших потоков HttpResponse, но все их соответствующие типы ввода HttpRequest, поэтому вы не можете связатьих вместе с via.

Чтобы связать ваши потоки, вам нужна функция, которая каким-то образом преобразует HttpResponse в HttpRequest:

val respToReq: HttpResponse => HttpRequest = ...

. Вы можете создать поток извышеупомянутая функция:

val convertingFlow: Flow[HttpResponse, HttpRequest] = Flow.fromFunction(respToReq)

Теперь вы можете объединить свои потоки в цепочку:

val handler = firstFlow via convertingFlow via iqServiceFlow via convertingFlow via lastFlow

Типы выровнены следующим образом:

[HttpRequest, HttpResponse] // firstFlow
                   |
                   v
             [HttpResponse, HttpRequest] // convertingFlow
                                |
                                v
                           [HttpRequest, HttpResponse] // iqServiceFlow
                                              |
                                              v
                                        [HttpResponse, HttpRequest] // convertingFlow
                                                            |
                                                            v                              
                                                      [HttpRequest, HttpResponse] // lastFlow

Выше предполагается, что вы можетеповторно использовать ту же функцию преобразования / поток.Если это предположение не выполняется, очевидно, вы можете создавать различные функции / потоки преобразования.

...