Я хочу заменить одну часть моего потокового процессора Akka Streams на Flink. Возможно ли в настоящее время использовать Akka Streams в качестве источника для Flink, а затем Flink в качестве источника для Akka Streams в той же кодовой базе?
Текущий поток с Akka Streams выглядит следующим образом:
// Kafka Source -> Read Flow involving Azure cosmosDB -> Evaluate Flow -> Write Flow -> Logger Flow -> Sink
lazy private val converterGraph: Graph[ClosedShape.type, Future[Done]] =
GraphDSL.create(sink) { implicit builder => out =>
source.watchTermination()(Keep.none) ~> prepareFlow ~> readFlow ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
ClosedShape
}
Потоки выше определены так:
def prepareFlow: Flow[FromSource, ToRead, NotUsed]
def readFlow: Flow[ToRead, ToEvaluate, NotUsed]
Теперь вместо readFlow
, являющегося потоком Akka, я хотел бы заменить его на потоковый процессор Flink. Таким образом, вывод prepareFlow
будет входом для основанного на Flink readFlow
, а вывод этого будет вводиться в evaluateFlow
.
В принципе, возможно ли сделать что-то вроде этого:
prepareFlow ~> [Flink source ->read -> result] ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
Я вижу, что в Apache Bahir есть соединитель Flink Akka (Sink) , но не уверен, что его можно использовать только с актерами Akka или также с потоками.