Я использую AWS Kinesis Data Analytics , чтобы попытаться отобрать наиболее популярные элементы, поступающие через мой поток. Я хотел бы видеть, какие предметы были самыми популярными за прошедший час, и обновлять их каждую минуту. Другими словами, одночасовое скользящее окно, которое обновляется каждую минуту.
Я почти, но не совсем понял, что это работает. Это дает мне скользящее окно продолжительностью один час, которое обновляется каждую минуту, для элементов, которые были включены в последнюю минуту .
Я бы хотел, чтобы любой предмет, который был замечен за последний час, выплевывал со своим счетом каждую минуту, пока его самое последнее событие не выпало из окна.
Что я пробовал:
- Сначала создайте промежуточный кувыркающийся поток,
TUMBLE_STREAM
и вставьте счетчик для каждого item_id
в этот поток с интервалами в одну минуту.
- Во-вторых, запрос содержимого
TUMBLE_STREAM
за один час скользящее окно и вставка результатов в мой целевой поток.
Вот мой SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("count" bigint, "item_id" bigint);
CREATE OR REPLACE STREAM "TUMBLE_STREAM" ("item_id_count" bigint, "item_id" bigint);
CREATE OR REPLACE PUMP "TUMBLE_PUMP" AS INSERT INTO "TUMBLE_STREAM"
SELECT STREAM COUNT(*) AS item_id_count, "item_id"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE), "item_id";
CREATE OR REPLACE PUMP "SLIDE_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM SUM("item_id_count") OVER ONE_HOUR AS "hour_count", "item_id"
FROM "TUMBLE_STREAM"
WINDOW ONE_HOUR AS (
PARTITION BY "item_id"
RANGE INTERVAL '1' HOUR PRECEDING)
ORDER BY "ROWTIME", "hour_count" DESC;
Это производит вывод, подобный этому:
ROWTIME count item_id
2018-05-10 17:41:00.001 1 1
2018-05-10 17:41:00.001 1 2
2018-05-10 17:41:00.001 2 3
2018-05-10 17:41:00.001 10 4
2018-05-10 17:41:00.001 26 5
* one minute later *
2018-05-10 17:42:00.001 4 3
2018-05-10 17:42:00.001 1 6
2018-05-10 17:42:00.001 11 4
2018-05-10 17:42:00.001 3 7
2018-05-10 17:42:00.001 30 5
Я хотел бы получить вывод, похожий на этот:
ROWTIME count item_id
2018-05-10 17:41:00.001 1 1
2018-05-10 17:41:00.001 1 2
2018-05-10 17:41:00.001 2 3
2018-05-10 17:41:00.001 10 4
2018-05-10 17:41:00.001 26 5
* one minute later *
2018-05-10 17:42:00.001 1 1
2018-05-10 17:42:00.001 1 2
2018-05-10 17:42:00.001 4 3
2018-05-10 17:42:00.001 1 6
2018-05-10 17:42:00.001 11 4
2018-05-10 17:42:00.001 3 7
2018-05-10 17:42:00.001 30 5
Новые item_id
s могут появиться в потоке в любое время. Как я могу включить все item_id
с прошедшего часа в вывод моего скользящего запроса?