У меня проблема с использованием стратегии пропуска при разработке на FLink CEP 1.7.1. Вот мой код,
public class CepSurf {
private static final Logger LOG = LoggerFactory.getLogger(CepSurf.class);
public static void main (String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple3<String, String, Long>> source = env.fromElements(Tuple3.of("b", "b1", 1L),
Tuple3.of("b", "b2", 2L),
Tuple3.of("b", "b3", 3L),
Tuple3.of("c", "c", 4L));
// pattern "b+ c"
Pattern<Tuple3<String, String, Long>, ?> pattern = Pattern.<Tuple3<String, String, Long>>begin("pattern-b", AfterMatchSkipStrategy.skipToFirst("pattern-b")).where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
if ("b".equals(value.f0)) {
LOG.debug("Found b! {}", value);
return true;
}
else
return false;
}
}).oneOrMore().next("pattern-c").where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
if ("c".equals(value.f0)) {
LOG.debug("Found c! {}", value);
return true;
}
else
return false;
}
}).within(Time.seconds(1));
PatternStream<Tuple3<String, String, Long>> patternStream = CEP.pattern(source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple3<String, String, Long>>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Tuple3<String, String, Long> lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1000);
}
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long previousElementTimestamp) {
return element.f2;
}
}), pattern);
patternStream.select(new PatternSelectFunction<Tuple3<String,String,Long>, String>() {
@Override
public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {
String phenomenon = pattern.get("pattern-b").stream().map(tuple -> tuple.f1).collect(Collectors.joining("-"));
phenomenon = String.join("-", phenomenon, pattern.get("pattern-c").get(0).f1);
return "MAYDAY" + phenomenon + "MAYDAY";
}
}).print();
try {
env.execute("orderness");
} catch (Exception e) {
e.printStackTrace();
}
}
}
А вывод консоли показан ниже,
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8748] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:45] : Found b! (b,b1,1)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8766] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:45] : Found b! (b,b2,2)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8766] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:45] : Found b! (b,b2,2)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8768] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:45] : Found b! (b,b2,2)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8768] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:45] : Found b! (b,b3,3)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8769] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:45] : Found b! (b,b3,3)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8769] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:45] : Found b! (b,b3,3)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8769] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:45] : Found b! (b,b3,3)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8769] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:45] : Found b! (b,b3,3)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8771] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:55] : Found c! (c,c,4)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8774] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:55] : Found c! (c,c,4)
2019-03-17 18:02:16 [nt to Std. Out (1/1): 8776] DEBUG [com.jd.alchemist.demon.flink.examples.lab.CepSurf:55] : Found c! (c,c,4)
MAYDAYb1-b2-b3-cMAYDAY
Есть два вопроса,
- Когда я использую стратегию
skipToFirst
, я ожидаю результатов как
MAYDAYb1-b2-b3-cMAYDAY
MAYDAYb2-b3-cMAYDAY
MAYDAYb3-cMAYDAY
Но я получил только MAYDAYb1-b2-b3-cMAYDAY
, что несовместимо с Официальный документ Flink. . Что не так с моим кодом?
- Почему вызванные времена
filter
для кортежей b1, b2, b3 и c равны 1, 3, 5 и 3?
Спасибо и высоко ценится.