Из объявления о выпуске Flink 1.5 мы знаем, что Flink теперь поддерживает «состояние широковещания», и было описано, что «состояние широковещания разблокирует реализацию функции« динамические шаблоны »для библиотеки CEP Flink».
Означает ли это, что в настоящее время мы можем использовать «широковещательное состояние» для реализации «динамических шаблонов» без Flink CEP?Также я понятия не имею, в чем разница при реализации «динамических шаблонов» для Flink CEP с или без состояния широковещания?Я был бы признателен, если кто-то может привести пример с кодом, чтобы объяснить разницу.
=============
Обновление для тестирования данных трансляции-поток оператором broadcast () с ключевым потоком данных
После тестирования в Flink 1.4.2 я обнаружил, что поток широковещательных данных (по старому оператору broadcast ()) может соединяться с потоком ключевых данных, ниже приведен тесткод, и мы нашли все события потока управления, передаваемые всем экземплярам оператора.Таким образом, кажется, что старая широковещательная рассылка () может выполнять те же функции, что и новое «широковещательное состояние».
public static void ConnectBroadToKeyedStream() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
List<Tuple1<String>>
controlData = new ArrayList<Tuple1<String>>();
controlData.add(new Tuple1<String>("DROP"));
controlData.add(new Tuple1<String>("IGNORE"));
DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);
List<Tuple1<String>>
dataStreamData = new ArrayList<Tuple1<String>>();
dataStreamData.add(new Tuple1<String>("data"));
dataStreamData.add(new Tuple1<String>("DROP"));
dataStreamData.add(new Tuple1<String>("artisans"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
// DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);
DataStream<String> result = control
.broadcast()
.connect(keyedDataStream)
.flatMap(new MyCoFlatMap());
result.print();
env.execute();
}
private static final class MyCoFlatMap
implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
HashSet blacklist = new HashSet();
@Override
public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
blacklist.add(control_value);
out.collect("listed " + control_value);
}
@Override
public void flatMap2(Tuple1<String> data_value, Collector<String> out) {
if (blacklist.contains(data_value)) {
out.collect("skipped " + data_value);
} else {
out.collect("passed " + data_value);
}
}
}
Ниже приведен результат теста.
1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)
https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement