Можно ли использовать потоковый процессор Flink между потоками Akka Streams в качестве источника и приемника? - PullRequest
0 голосов
/ 23 апреля 2019

Я хочу заменить одну часть моего потокового процессора Akka Streams на Flink. Возможно ли в настоящее время использовать Akka Streams в качестве источника для Flink, а затем Flink в качестве источника для Akka Streams в той же кодовой базе?

Текущий поток с Akka Streams выглядит следующим образом:

 // Kafka Source -> Read Flow involving Azure cosmosDB -> Evaluate Flow -> Write Flow -> Logger Flow -> Sink
  lazy private val converterGraph: Graph[ClosedShape.type, Future[Done]] =
    GraphDSL.create(sink) { implicit builder => out =>
      source.watchTermination()(Keep.none) ~> prepareFlow ~> readFlow ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
      ClosedShape
  }

Потоки выше определены так:

def prepareFlow: Flow[FromSource, ToRead, NotUsed]

def readFlow: Flow[ToRead, ToEvaluate, NotUsed]

Теперь вместо readFlow, являющегося потоком Akka, я хотел бы заменить его на потоковый процессор Flink. Таким образом, вывод prepareFlow будет входом для основанного на Flink readFlow, а вывод этого будет вводиться в evaluateFlow.

В принципе, возможно ли сделать что-то вроде этого:

  prepareFlow ~> [Flink source ->read -> result] ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out

Я вижу, что в Apache Bahir есть соединитель Flink Akka (Sink) , но не уверен, что его можно использовать только с актерами Akka или также с потоками.

1 Ответ

0 голосов
/ 23 апреля 2019

Вы можете обернуть свои prepareFlow показания из CosmosDB как пользовательский миг Source (расширяя SourceFunction) и обернуть весь поток вычисления-записи-записи как пользовательский SinkFunction.

Поскольку сам Flink распространяется, то вы интегрируете akka-stream в задание Flink, но не наоборот.Основные проблемы, которые я вижу при таком подходе, заключаются в том, что у akka-stream было обратное давление из коробки, но сам Flink в основном блокирует.Например, метод SourceFunction.run () требует наличия внутреннего бесконечного цикла, генерирующего сообщения на каждой итерации, поэтому вам нужно заблокировать его там, чтобы дождаться, пока akka-stream создаст следующее сообщение.

...