AWS Kinesis Tumbling-Sliding Window Count (популярный / трендовый счетчик) - PullRequest
0 голосов
/ 10 мая 2018

Я использую AWS Kinesis Data Analytics , чтобы попытаться отобрать наиболее популярные элементы, поступающие через мой поток. Я хотел бы видеть, какие предметы были самыми популярными за прошедший час, и обновлять их каждую минуту. Другими словами, одночасовое скользящее окно, которое обновляется каждую минуту.

Я почти, но не совсем понял, что это работает. Это дает мне скользящее окно продолжительностью один час, которое обновляется каждую минуту, для элементов, которые были включены в последнюю минуту .

Я бы хотел, чтобы любой предмет, который был замечен за последний час, выплевывал со своим счетом каждую минуту, пока его самое последнее событие не выпало из окна.

Что я пробовал:

  • Сначала создайте промежуточный кувыркающийся поток, TUMBLE_STREAM и вставьте счетчик для каждого item_id в этот поток с интервалами в одну минуту.
  • Во-вторых, запрос содержимого TUMBLE_STREAM за один час скользящее окно и вставка результатов в мой целевой поток.

Вот мой SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("count" bigint, "item_id" bigint);

CREATE OR REPLACE STREAM "TUMBLE_STREAM" ("item_id_count" bigint, "item_id" bigint);

CREATE OR REPLACE PUMP "TUMBLE_PUMP" AS INSERT INTO "TUMBLE_STREAM"
SELECT STREAM COUNT(*) AS item_id_count, "item_id"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE), "item_id";

CREATE OR REPLACE PUMP "SLIDE_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM SUM("item_id_count") OVER ONE_HOUR AS "hour_count", "item_id"
FROM "TUMBLE_STREAM"
WINDOW ONE_HOUR AS (
    PARTITION BY "item_id" 
    RANGE INTERVAL '1' HOUR PRECEDING)
ORDER BY "ROWTIME", "hour_count" DESC;

Это производит вывод, подобный этому:

ROWTIME                 count item_id
2018-05-10 17:41:00.001 1     1
2018-05-10 17:41:00.001 1     2
2018-05-10 17:41:00.001 2     3
2018-05-10 17:41:00.001 10    4
2018-05-10 17:41:00.001 26    5
 * one minute later *
2018-05-10 17:42:00.001 4     3
2018-05-10 17:42:00.001 1     6
2018-05-10 17:42:00.001 11    4
2018-05-10 17:42:00.001 3     7
2018-05-10 17:42:00.001 30    5

Я хотел бы получить вывод, похожий на этот:

ROWTIME                 count item_id
2018-05-10 17:41:00.001 1     1
2018-05-10 17:41:00.001 1     2
2018-05-10 17:41:00.001 2     3
2018-05-10 17:41:00.001 10    4
2018-05-10 17:41:00.001 26    5
 * one minute later *
2018-05-10 17:42:00.001 1     1
2018-05-10 17:42:00.001 1     2
2018-05-10 17:42:00.001 4     3
2018-05-10 17:42:00.001 1     6
2018-05-10 17:42:00.001 11    4
2018-05-10 17:42:00.001 3     7
2018-05-10 17:42:00.001 30    5

Новые item_id s могут появиться в потоке в любое время. Как я могу включить все item_id с прошедшего часа в вывод моего скользящего запроса?

...