Я пытаюсь настроить очень простую работу Flink.Когда я пытаюсь запустить, получаю следующую ошибку:
Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1535)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:53)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.test.flink.jobs.TestJobRunnable$.run(TestJob.scala:223)
Ошибка вызвана кодом ниже:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
streamExecutionEnvironment.execute("Test Job")
Ошибка исчезает, когда я добавляю вызов print()
до конца потока:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
messageStream.print()
streamExecutionEnvironment.execute("Test Job")
Я не понимаю, почему print()
решает эту проблему.Является ли идея, что потоковая топология не обрабатывает ни одного из своих операторов, пока не будет введен приемник?print()
действует здесь как раковина?Любая помощь будет оценена.Спасибо.