Я пытаюсь переписать SQL с помощью Apache Flink 1.7.1 CEP Java API, но вывод неожиданный.«Жадные циклические шаблоны» не работают.
Я пытаюсь переписать SQL с помощью Apache Flink 1.7.1 CEP Java api.
SQL с MATCH_RECOGNIZE выглядит следующим образом ( отсюда):
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
START_ROW.rowtime AS start_tstamp,
LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
LAST(PRICE_UP.rowtime) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST PRICE_UP
PATTERN (START_ROW PRICE_DOWN+ PRICE_UP+)
DEFINE
PRICE_DOWN AS
(LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
PRICE_UP AS
PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
) MR;
Содержимое тестового файла csv выглядит следующим образом:
ACME,2019-01-1,12
ACME,2019-01-2,17
ACME,2019-01-3,19
ACME,2019-01-4,21
ACME,2019-01-5,25
ACME,2019-01-6,12
ACME,2019-01-7,15
ACME,2019-01-8,20
ACME,2019-01-9,24
ACME,2019-01-10,25
ACME,2019-01-11,19
ACME,2019-01-12,15
ACME,2019-01-13,25
ACME,2019-01-14,25
ACME,2019-01-15,14
ACME,2019-01-16,12
ACME,2019-01-17,14
ACME,2019-01-18,24
ACME,2019-01-19,23
ACME,2019-01-20,22
Мой переписанный код CEP выглядит следующим образом:
final String cvsFile = "table.csv";
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
final StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final Pattern<Ticker, Ticker> startPattern = Pattern.<Ticker>begin("START_ROW");
final Pattern<Ticker, Ticker> downPattern = Pattern.<Ticker>begin("PRICE_DOWN")
.where(new IterativeCondition<Ticker>() {
@Override
public boolean filter(Ticker value, IterativeCondition.Context<Ticker> ctx) throws Exception {
ArrayList<Ticker> downEvents = Lists.newArrayList(ctx.getEventsForPattern("PRICE_DOWN"));
ArrayList<Ticker> startEvents = Lists.newArrayList(ctx.getEventsForPattern("START_ROW"));
if (downEvents.isEmpty()) {
if (value.getPrice() < startEvents.get(0).getPrice()) {
return true;
} else {
return false;
}
}
return value.getPrice() < downEvents.get(downEvents.size() - 1).getPrice();
}
}).oneOrMore().greedy();
final Pattern<Ticker, Ticker> upPattern = Pattern
.<Ticker>begin("PRICE_UP", AfterMatchSkipStrategy.skipToLast("PRICE_UP"))
.where(new IterativeCondition<Ticker>() {
@Override
public boolean filter(Ticker value, IterativeCondition.Context<Ticker> ctx) throws Exception {
ArrayList<Ticker> upEvents = Lists.newArrayList(ctx.getEventsForPattern("PRICE_UP"));
if (upEvents.isEmpty()) {
return true;
}
return value.getPrice() > upEvents.get(upEvents.size() - 1).getPrice();
}
}).oneOrMore().greedy();
Pattern<Ticker, Ticker> pattern = startPattern.next(downPattern).next(upPattern);
DataStream<Ticker> ds = executionEnvironment.readTextFile(cvsFile).map(new MapFunction<String, Ticker>() {
@Override
public Ticker map(String value) throws Exception {
String[] split = value.split(",");
return new Ticker(split[0], new Timestamp(sdf.parse(split[1]).getTime()), Long.valueOf(split[2]));
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Ticker>() {
@Override
public long extractAscendingTimestamp(Ticker t) {
return t.tstamp.getTime();
}
}).keyBy(new KeySelector<Ticker, String>() {
@Override
public String getKey(Ticker t) throws Exception {
return t.symbol;
}
});
DataStream<String> ps = CEP.pattern(ds, pattern).select(new PatternSelectFunction<Ticker, String>() {
@Override
public String select(Map<String, List<Ticker>> pattern) throws Exception {
List<Ticker> start = pattern.get("START_ROW");
List<Ticker> down = pattern.get("PRICE_DOWN");
List<Ticker> up = pattern.get("PRICE_UP");
return sdf.format(start.get(0).tstamp) + " # " + sdf.format(down.get(down.size() - 1).tstamp) + " # "
+ sdf.format(up.get(up.size() - 1).tstamp);
}
});
ps.print().setParallelism(1);
executionEnvironment.execute();
Class Ticker:
class Ticker {
String symbol;
Timestamp tstamp;
long price;
public Ticker(String symbol, Timestamp tstamp, long price) {
this.symbol = symbol;
this.tstamp = tstamp;
this.price = price;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
public Timestamp getTstamp() {
return tstamp;
}
public void setTstamp(Timestamp tstamp) {
this.tstamp = tstamp;
}
public long getPrice() {
return price;
}
public void setPrice(long price) {
this.price = price;
}
@Override
public String toString() {
return "Ticker [symbol=" + symbol + ", tstamp=" + tstamp + ", price=" + price + "]";
}
}
Вывод:
2019-01-05 # 2019-01-06 # 2019-01-07
2019-01-05 # 2019-01-06 # 2019-01-08
2019-01-05 # 2019-01-06 # 2019-01-09
2019-01-05 # 2019-01-06 # 2019-01-10
2019-01-10 # 2019-01-12 # 2019-01-13
2019-01-11 # 2019-01-12 # 2019-01-13
2019-01-10 # 2019-01-16 # 2019-01-17
2019-01-11 # 2019-01-16 # 2019-01-17
2019-01-14 # 2019-01-16 # 2019-01-17
2019-01-15 # 2019-01-16 # 2019-01-17
2019-01-10 # 2019-01-16 # 2019-01-18
2019-01-11 # 2019-01-16 # 2019-01-18
2019-01-14 # 2019-01-16 # 2019-01-18
2019-01-15 # 2019-01-16 # 2019-01-18
Ожидаемый вывод должен быть:
2019-01-05 # 2019-01-06 # 2019-01-10
2019-01-10 # 2019-01-12 # 2019-01-13
2019-01-14 # 2019-01-16 # 2019-01-18
Кажется, жадный режим не работает.Почему?