Скажем, я хочу получить двухсторонний вывод из потока. Это хорошо сформулировано Когда есть только один побочный вывод, но когда я хочу получить множественный побочный вывод, Flink выдал ClassCastException.
val input: DataStream[CanalModel] = see.addSource(consumer).name("FlinkKafkaConsumer")
val output1: OutputTag[CanalModel] = OutputTag[CanalModel]("store_goods")
val output2: OutputTag[CanalModel] = OutputTag[CanalModel]("store")
val drugStream = input
.process(new ProcessFunction[CanalModel, CanalModel] {
override def processElement(value: CanalModel, ctx: ProcessFunction[CanalModel, CanalModel]#Context, out: Collector[CanalModel]): Unit = {
if (value != null) {
if ("goods".equals(value.table)) {
out.collect(value)
} else if ("store_goods".equals(value.table)) {
ctx.output(output1, value)
} else if ("stores".equals(value.table)) {
ctx.output(output2, value)
}
}
}
})
Флинк бросил исключение ниже:
java.lang.ClassCastException: scala.collection.immutable.HashMap$HashTrieMap cannot be cast to cn.uniondrug.analysis.model.StoreGoodsModel. Failed to push OutputTag with id 'store_goods' to operator. This can occur when multiple OutputTags with different types but identical names are being used.
Как решить проблему?Спасибо.