Несколько выходных тегов в потоке обрабатывают функцию с ClassCastException - PullRequest
0 голосов
/ 10 мая 2019

Скажем, я хочу получить двухсторонний вывод из потока. Это хорошо сформулировано Когда есть только один побочный вывод, но когда я хочу получить множественный побочный вывод, Flink выдал ClassCastException.

val input: DataStream[CanalModel] = see.addSource(consumer).name("FlinkKafkaConsumer")
val output1: OutputTag[CanalModel] = OutputTag[CanalModel]("store_goods")
val output2: OutputTag[CanalModel] = OutputTag[CanalModel]("store")

val drugStream = input
  .process(new ProcessFunction[CanalModel, CanalModel] {
    override def processElement(value: CanalModel, ctx: ProcessFunction[CanalModel, CanalModel]#Context, out: Collector[CanalModel]): Unit = {
      if (value != null) {
        if ("goods".equals(value.table)) {
          out.collect(value)
        } else if ("store_goods".equals(value.table)) {
          ctx.output(output1, value)
        } else if ("stores".equals(value.table)) {
          ctx.output(output2, value)
        }
      }
    }
  })

Флинк бросил исключение ниже:

java.lang.ClassCastException: scala.collection.immutable.HashMap$HashTrieMap cannot be cast to cn.uniondrug.analysis.model.StoreGoodsModel. Failed to push OutputTag with id 'store_goods' to operator. This can occur when multiple OutputTags with different types but identical names are being used.

Как решить проблему?Спасибо.

...