Как работают жадные алгоритмы Apache Flink CEP? - PullRequest
0 голосов
/ 27 января 2019

Я пытаюсь переписать SQL с помощью Apache Flink 1.7.1 CEP Java API, но вывод неожиданный.«Жадные циклические шаблоны» не работают.

Я пытаюсь переписать SQL с помощью Apache Flink 1.7.1 CEP Java api.

SQL с MATCH_RECOGNIZE выглядит следующим образом ( отсюда):

SELECT *
FROM Ticker
MATCH_RECOGNIZE (
    PARTITION BY symbol
    ORDER BY rowtime
    MEASURES
        START_ROW.rowtime AS start_tstamp,
        LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
        LAST(PRICE_UP.rowtime) AS end_tstamp
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO LAST PRICE_UP
    PATTERN (START_ROW PRICE_DOWN+ PRICE_UP+)
    DEFINE
        PRICE_DOWN AS
            (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
                PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
        PRICE_UP AS
            PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
    ) MR;

Содержимое тестового файла csv выглядит следующим образом:

ACME,2019-01-1,12
ACME,2019-01-2,17
ACME,2019-01-3,19
ACME,2019-01-4,21
ACME,2019-01-5,25
ACME,2019-01-6,12
ACME,2019-01-7,15
ACME,2019-01-8,20
ACME,2019-01-9,24
ACME,2019-01-10,25
ACME,2019-01-11,19
ACME,2019-01-12,15
ACME,2019-01-13,25
ACME,2019-01-14,25
ACME,2019-01-15,14
ACME,2019-01-16,12
ACME,2019-01-17,14
ACME,2019-01-18,24
ACME,2019-01-19,23
ACME,2019-01-20,22

Мой переписанный код CEP ​​выглядит следующим образом:

final String cvsFile = "table.csv";
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
final StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final Pattern<Ticker, Ticker> startPattern = Pattern.<Ticker>begin("START_ROW");
final Pattern<Ticker, Ticker> downPattern = Pattern.<Ticker>begin("PRICE_DOWN")
        .where(new IterativeCondition<Ticker>() {
            @Override
            public boolean filter(Ticker value, IterativeCondition.Context<Ticker> ctx) throws Exception {
                ArrayList<Ticker> downEvents = Lists.newArrayList(ctx.getEventsForPattern("PRICE_DOWN"));
                ArrayList<Ticker> startEvents = Lists.newArrayList(ctx.getEventsForPattern("START_ROW"));

                if (downEvents.isEmpty()) {
                    if (value.getPrice() < startEvents.get(0).getPrice()) {
                        return true;
                    } else {
                        return false;
                    }
                }

                return value.getPrice() < downEvents.get(downEvents.size() - 1).getPrice();
            }
        }).oneOrMore().greedy();
final Pattern<Ticker, Ticker> upPattern = Pattern
        .<Ticker>begin("PRICE_UP", AfterMatchSkipStrategy.skipToLast("PRICE_UP"))
        .where(new IterativeCondition<Ticker>() {
            @Override
            public boolean filter(Ticker value, IterativeCondition.Context<Ticker> ctx) throws Exception {
                ArrayList<Ticker> upEvents = Lists.newArrayList(ctx.getEventsForPattern("PRICE_UP"));
                if (upEvents.isEmpty()) {
                    return true;
                }

                return value.getPrice() > upEvents.get(upEvents.size() - 1).getPrice();
            }
        }).oneOrMore().greedy();

Pattern<Ticker, Ticker> pattern = startPattern.next(downPattern).next(upPattern);

DataStream<Ticker> ds = executionEnvironment.readTextFile(cvsFile).map(new MapFunction<String, Ticker>() {
    @Override
    public Ticker map(String value) throws Exception {
        String[] split = value.split(",");
        return new Ticker(split[0], new Timestamp(sdf.parse(split[1]).getTime()), Long.valueOf(split[2]));
    }
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Ticker>() {
    @Override
    public long extractAscendingTimestamp(Ticker t) {
        return t.tstamp.getTime();
    }
}).keyBy(new KeySelector<Ticker, String>() {
    @Override
    public String getKey(Ticker t) throws Exception {
        return t.symbol;
    }
});

DataStream<String> ps = CEP.pattern(ds, pattern).select(new PatternSelectFunction<Ticker, String>() {
    @Override
    public String select(Map<String, List<Ticker>> pattern) throws Exception {
        List<Ticker> start = pattern.get("START_ROW");
        List<Ticker> down = pattern.get("PRICE_DOWN");
        List<Ticker> up = pattern.get("PRICE_UP");

        return sdf.format(start.get(0).tstamp) + " # " + sdf.format(down.get(down.size() - 1).tstamp) + " # "
                + sdf.format(up.get(up.size() - 1).tstamp);
    }
});

ps.print().setParallelism(1);

executionEnvironment.execute();

Class Ticker:

class Ticker {

    String symbol;
    Timestamp tstamp;
    long price;

    public Ticker(String symbol, Timestamp tstamp, long price) {
        this.symbol = symbol;
        this.tstamp = tstamp;
        this.price = price;
    }

    public String getSymbol() {
        return symbol;
    }
    public void setSymbol(String symbol) {
        this.symbol = symbol;
    }
    public Timestamp getTstamp() {
        return tstamp;
    }
    public void setTstamp(Timestamp tstamp) {
        this.tstamp = tstamp;
    }
    public long getPrice() {
        return price;
    }
    public void setPrice(long price) {
        this.price = price;
    }
    @Override
    public String toString() {
        return "Ticker [symbol=" + symbol + ", tstamp=" + tstamp + ", price=" + price + "]";
    }
}

Вывод:

2019-01-05 # 2019-01-06 # 2019-01-07
2019-01-05 # 2019-01-06 # 2019-01-08
2019-01-05 # 2019-01-06 # 2019-01-09
2019-01-05 # 2019-01-06 # 2019-01-10
2019-01-10 # 2019-01-12 # 2019-01-13
2019-01-11 # 2019-01-12 # 2019-01-13
2019-01-10 # 2019-01-16 # 2019-01-17
2019-01-11 # 2019-01-16 # 2019-01-17
2019-01-14 # 2019-01-16 # 2019-01-17
2019-01-15 # 2019-01-16 # 2019-01-17
2019-01-10 # 2019-01-16 # 2019-01-18
2019-01-11 # 2019-01-16 # 2019-01-18
2019-01-14 # 2019-01-16 # 2019-01-18
2019-01-15 # 2019-01-16 # 2019-01-18

Ожидаемый вывод должен быть:

2019-01-05 # 2019-01-06 # 2019-01-10
2019-01-10 # 2019-01-12 # 2019-01-13
2019-01-14 # 2019-01-16 # 2019-01-18

Кажется, жадный режим не работает.Почему?

...