Как указать два источника, один оператор процесса и один оператор приемника в приложении flink - PullRequest
0 голосов
/ 01 августа 2020

Я использую flink 1.3, я определил два источника потока, которые будут выдавать одни и те же события для обработки последующими операторами (мой определенный оператор процесса и оператор приемника). процесс-розовый конвейер, я мог указать только один источник, я бы спросил, как указать два или более источника и сделать один и тот же процесс и приемник

object FlinkApplication {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.addSource(new MySource1()) //How to MySource2 here?
      .setParallelism(1)
      .name("source1")
      .process(new MyProcess())
      .setParallelism(4)
      .addSink(new MySink())
      .setParallelism(2)
    env.execute("FlinkApplication")
  }

}

1 Ответ

1 голос
/ 01 августа 2020

API обеспечивает большую гибкость относительно того, как вы можете настроить конвейеры обработки. Вы можете сделать это, если хотите применить один и тот же logi c к нескольким источникам:

env.addSource(new MySource1())
  .process(new MyProcess())
  .addSink(new MySink())

env.addSource(new MySource2())
  .process(new MyProcess())
  .addSink(new MySink())

env.execute()

Или, если это имеет больше смысла, вы можете объединить два потока, а затем обработать объединенный поток (или некоторая комбинация этих подходов):

stream1.union(stream2)
  .process(...)
  .addSink(...)

Также можно сделать что-то наоборот, если вы хотите разделить поток и применить различные операции к каждой копии:

val stream: DataStream[T] = env.addSource(new MySource())

stream.process(new MyProcess1())
  .addSink(new MySink1())

stream.process(new MyProcess2())
  .addSink(new MySink2())

env.execute()

И вау, Flink 1.3 больше трех лет!

...