Flink - скользящее окно события-времени с отсутствующими данными в окне из-за промежутков времени - PullRequest
0 голосов
/ 09 октября 2018

Предположим, у меня есть поток торговых событий на фондовом рынке, например:

technical1, ALXN, 1/1/2016
technical1, CELG, 1/1/2016
technical2, ALXN, 1/2/2016
technical2, CELG, 1/2/2016
. . . 
technicalN, ALXN, 4/1/2018
technicalN, CELG, 4/1/2018

, такой, что technicalN (где N - некоторое число) представляет N-ю техническую торговую запись [Open (float), High (float), Low (float), Close (float), Volume (int)] ежедневных данных по торговле на фондовом рынке на конец дня для данной компании.(т. е. технические1 для тикера GOOG отличаются от технических1 для тикера MSFT)создать окно размера 2 с интервалом в 1 день, чтобы наши данные выглядели примерно так:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 12/30/2017]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 12/30/2017]
[technical5, GOOG, 12/30/2017; technical6, GOOG, 12/31/2017]
[technical5, MSFT, 12/30/2017; technical6, MSFT, 12/31/2017]
[technical6, GOOG, 12/31/2017; technical7, GOOG, 01/01/2018]
[technical6, MSFT, 12/31/2017; technical7, MSFT, 01/01/2018]
[technical7, GOOG, 01/01/2018; technical8, GOOG, 01/02/2018]
[technical7, MSFT, 01/01/2018; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
. . .

Это было бы неплохо, но это проблематично, потому что даты торгов на фондовом рынке не являются непрерывными.Другими словами, если я правильно понимаю механику Флинка (и я могу ошибаться), проблема с использованием скользящего окна времени-события, подобного этому:

DataStream<T> input = ...;

// sliding event-time windows
input
.keyBy((TechnicalDataEntry technical) -> technical.ticker)
.window(SlidingEventTimeWindows.of(Time.day(2), Time.day(1))) // Window size of 2 days, sliding interval of 1 day
.<windowed transformation>(<window function>);

на таких данных, заключается в том, чтозначения даты не являются непрерывными (это означает, что они следуют за дискретным рядом, содержащим разрывы одного или нескольких пропущенных дней) , поскольку нет данных о фондовом рынке для дат, в которые фондовый рынок закрыт, например, в праздничные или выходные дни .Таким образом, с учетом этого наш поток будет выглядеть примерно так (потому что торги закрыты 30.12.2017, 31.12.2017 и 01.01.2017):

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; NULL]
[technical4, MSFT, 12/29/2017; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; technical8, GOOG, 01/02/2018]
[NULL; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]

Как мне заставить мой поток Flink игнорировать пропущенные даты (и вместо окна или объединять или отображать вместе последовательные не пропущенные даты), чтобы мой поток выглядел следующим образом:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 01/02/2018]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 01/02/2018]
[technical5, GOOG, 01/02/2018; technical6, GOOG, 01/03/2018]
[technical5, MSFT, 01/02/2018; technical6, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]

?

(Примечание: пожалуйста, не обращайте внимания на то, как я увеличиваю число на строку «технические» (например, технические1, технические2 и т. Д.), Потому что, как я уже упоминал, это значение использовалось только в описательных целях.и фактически не существует в данных. Единственный способ определить, являются ли две торговые записи последовательными, состоит в том, чтобы сгруппировать их по тикеру и упорядочить по торговой дате. Предположим, что повторяющихся событий не существует.)

1 Ответ

0 голосов
/ 12 октября 2018

Если я правильно понимаю, ваша проблема в том, что из-за определенных периодов, когда вы не получаете события, окна не будут вести себя должным образом, так как они не знают о времени.

Одинвариант, который вы имеете, состоит в том, чтобы периодически излучать водяной знак следующим образом:

streamEnvironment.addSource(new SourceFunction<Object>() {
        @Override
        public void run(final SourceContext<Object> ctx) {
            (...)

            ctx.emitWatermark(new Watermark(timestamp));
        }

        @Override
        public void cancel() {

        }
    })

Имейте в виду, что, если вы получаете события до водяного знака, они будут игнорироваться, поэтому периодичность выброса водяного знака является компромиссом между«точность окна» (стрельба как можно быстрее) и терпимость к поздним событиям.

...