Flink: java.lang.UnsupportedOperationException: Невозможно переопределить разделение для KeyedStream - PullRequest
0 голосов
/ 01 октября 2019

Я получаю следующее исключение при запуске моей маленькой программы flink. Приложение имеет два потока данных, поступающих из одного и того же источника. Имеет состояние трансляции. Я написал это для тестирования производительности, но выдаю исключения

Caused by: java.lang.UnsupportedOperationException: Cannot override partitioning for KeyedStream.
    at org.apache.flink.streaming.api.datastream.KeyedStream.setConnectionType(KeyedStream.java:251)
    at org.apache.flink.streaming.api.datastream.DataStream.broadcast(DataStream.java:429)
    at org.apache.flink.streaming.api.scala.DataStream.broadcast(DataStream.scala:495)

мой код:

    val testStream: DataStream[Tuple2[String, String]] = env
      .addSource(
        new MockKafkaSource
      )
      .filter(x => !x._1.equals("x"))
      .map(x => x)
      .uid("test stream 1")

    val testStream2: DataStream[Tuple2[String, String]] = env
      .addSource(
        new MockKafkaSource
      )
      .map(x => x)
      .keyBy(x => x._1)
      .uid("test stream 2")

    lazy val testStateDescriptor =
      new MapStateDescriptor("testState", classOf[String], classOf[Tuple2[String, String]])

    val testBroadcastStream = testStream.broadcast(testStateDescriptor)

    val broadcastOutStream: DataStream[Tuple2[String, String]] =
      testStream2
        .connect(testBroadcastStream)
        .process(new StateProcess)

    broadcastOutStream.print()

Исключение происходит в этой строке:

val testBroadcastStream = testStream.broadcast(testStateDescriptor)
...