Я хочу выполнить внешнее объединение для 2 потоков данных, и лучше не помещать их в окно (я видел, что Cogroup
всегда поставляется с окном).
Я пробовал это:
val controlStream = Flink.flinkEnv.fromElements(
(1, "mex1", "stream1_feat1"),
(1, "mex2", "stream1_feat2")
).keyBy(x => (x._1, x._2))
val wordStream = Flink.flinkEnv.fromElements(
(1, "mex1", "stream2_feat1"),
(1, "mex3", "stream2_feat3")
).keyBy(x => (x._1, x._2))
val filteredStream = controlStream
.connect(wordStream)
.flatMap(new ControlFunction)
////////////////////////////////////////////////////////////////////////
class ControlFunction extends RichCoFlatMapFunction[
(Int, String, String),
(Int, String, String),
(Int, String, String, String)] {
// outer join
private var state1: ValueState[(Int, String, String)] = _
private var state2: ValueState[(Int, String, String)] = _
override def open(parameters: Configuration): Unit = {
state1 = getRuntimeContext.getState(
new ValueStateDescriptor[(Int, String, String)]("s1", createTypeInformation[(Int, String, String)]))
state2 = getRuntimeContext.getState(
new ValueStateDescriptor[(Int, String, String)]("s2", createTypeInformation[(Int, String, String)]))
}
override def flatMap1(value: (Int, String, String),
out: Collector[(Int, String, String, String)]): Unit = {
val state2Value = state2.value
if (state2Value != null) {
println("inside map1 not null")
state2.clear()
out.collect((value._1, value._2, value._3, state2Value._3))
} else {
println("inside map1 null")
state1.update(value)
out.collect((value._1, value._2, value._3, "NA"))
}
}
override def flatMap2(value: (Int, String, String),
out: Collector[(Int, String, String, String)]): Unit = {
val state1Value = state1.value
if (state1Value != null) {
println("inside map2 not null")
state1.clear()
out.collect((value._1, value._2, state1Value._3, value._3))
} else {
println("inside map2 null")
state2.update(value)
out.collect((value._1, value._2, "NA", value._3))
}
}
}
Что дало мне:
5> (1,mex2,stream1_feat2,NA)
8> (1,mex1,stream1_feat1,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)
Где запись (1,mex1,stream1_feat1,NA)
не должна производиться. В результате я хочу получить внешнее соединение:
5> (1,mex2,stream1_feat2,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)
Получив оператор печати, я обнаружил, что 2 flapMaps
были переданы в последовательности, что привело к тому, что mex1
было произведено дважды, в любом случае решить эту проблему?
Заранее спасибо!