Предположим, у меня есть поток торговых событий на фондовом рынке, например:
technical1, ALXN, 1/1/2016
technical1, CELG, 1/1/2016
technical2, ALXN, 1/2/2016
technical2, CELG, 1/2/2016
. . .
technicalN, ALXN, 4/1/2018
technicalN, CELG, 4/1/2018
, такой, что technicalN (где N - некоторое число) представляет N-ю техническую торговую запись [Open (float), High (float), Low (float), Close (float), Volume (int)] ежедневных данных по торговле на фондовом рынке на конец дня для данной компании.(т. е. технические1 для тикера GOOG отличаются от технических1 для тикера MSFT)создать окно размера 2 с интервалом в 1 день, чтобы наши данные выглядели примерно так:
[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 12/30/2017]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 12/30/2017]
[technical5, GOOG, 12/30/2017; technical6, GOOG, 12/31/2017]
[technical5, MSFT, 12/30/2017; technical6, MSFT, 12/31/2017]
[technical6, GOOG, 12/31/2017; technical7, GOOG, 01/01/2018]
[technical6, MSFT, 12/31/2017; technical7, MSFT, 01/01/2018]
[technical7, GOOG, 01/01/2018; technical8, GOOG, 01/02/2018]
[technical7, MSFT, 01/01/2018; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
. . .
Это было бы неплохо, но это проблематично, потому что даты торгов на фондовом рынке не являются непрерывными.Другими словами, если я правильно понимаю механику Флинка (и я могу ошибаться), проблема с использованием скользящего окна времени-события, подобного этому:
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy((TechnicalDataEntry technical) -> technical.ticker)
.window(SlidingEventTimeWindows.of(Time.day(2), Time.day(1))) // Window size of 2 days, sliding interval of 1 day
.<windowed transformation>(<window function>);
на таких данных, заключается в том, чтозначения даты не являются непрерывными (это означает, что они следуют за дискретным рядом, содержащим разрывы одного или нескольких пропущенных дней) , поскольку нет данных о фондовом рынке для дат, в которые фондовый рынок закрыт, например, в праздничные или выходные дни .Таким образом, с учетом этого наш поток будет выглядеть примерно так (потому что торги закрыты 30.12.2017, 31.12.2017 и 01.01.2017):
[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; NULL]
[technical4, MSFT, 12/29/2017; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; technical8, GOOG, 01/02/2018]
[NULL; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
Как мне заставить мой поток Flink игнорировать пропущенные даты (и вместо окна или объединять или отображать вместе последовательные не пропущенные даты), чтобы мой поток выглядел следующим образом:
[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 01/02/2018]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 01/02/2018]
[technical5, GOOG, 01/02/2018; technical6, GOOG, 01/03/2018]
[technical5, MSFT, 01/02/2018; technical6, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
?
(Примечание: пожалуйста, не обращайте внимания на то, как я увеличиваю число на строку «технические» (например, технические1, технические2 и т. Д.), Потому что, как я уже упоминал, это значение использовалось только в описательных целях.и фактически не существует в данных. Единственный способ определить, являются ли две торговые записи последовательными, состоит в том, чтобы сгруппировать их по тикеру и упорядочить по торговой дате. Предположим, что повторяющихся событий не существует.)