Я получаю следующее исключение при запуске моей маленькой программы flink. Приложение имеет два потока данных, поступающих из одного и того же источника. Имеет состояние трансляции. Я написал это для тестирования производительности, но выдаю исключения
Caused by: java.lang.UnsupportedOperationException: Cannot override partitioning for KeyedStream.
at org.apache.flink.streaming.api.datastream.KeyedStream.setConnectionType(KeyedStream.java:251)
at org.apache.flink.streaming.api.datastream.DataStream.broadcast(DataStream.java:429)
at org.apache.flink.streaming.api.scala.DataStream.broadcast(DataStream.scala:495)
мой код:
val testStream: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.filter(x => !x._1.equals("x"))
.map(x => x)
.uid("test stream 1")
val testStream2: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.map(x => x)
.keyBy(x => x._1)
.uid("test stream 2")
lazy val testStateDescriptor =
new MapStateDescriptor("testState", classOf[String], classOf[Tuple2[String, String]])
val testBroadcastStream = testStream.broadcast(testStateDescriptor)
val broadcastOutStream: DataStream[Tuple2[String, String]] =
testStream2
.connect(testBroadcastStream)
.process(new StateProcess)
broadcastOutStream.print()
Исключение происходит в этой строке:
val testBroadcastStream = testStream.broadcast(testStateDescriptor)