Я хочу использовать Kinesis Data Analytics (или какую-либо другую управляемую службу AWS) для пакетной обработки записей на основе критериев фильтрации. Идея состояла бы в том, что по мере поступления записей мы запускаем окно сеанса и пакетируем любые подходящие записи в течение 15 минут.
Пошаговое окно - это именно то, что нам нужно, за исключением того, что мы не собираемся собирать данные, а просто возвращаем все записи вместе.
В идеале ...
100 records spread over 15 min. (20 matching criteria) with first one at 10:02
|
v
At 10:17, the 20 matching records would be sent to the destination
Я пытался сделать что-то вроде:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"device_id" INTEGER,
"child_id" INTEGER,
"domain" VARCHAR(32),
"category_id" INTEGER,
"posted_at" DOUBLE,
"block" TIMESTAMP
);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM
"device_id",
"child_id",
"domain",
"category_id",
"posted_at",
FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) as block
FROM "SOURCE_SQL_STREAM_001"
WHERE "category_id" = 888815186
WINDOWED BY STAGGER (
PARTITION BY "child_id", FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE)
RANGE INTERVAL '15' MINUTE);
Я продолжаю получать ошибки для всех столбцов, не входящих в агрегирование:
From line 6, column 5 to line 6, column 12: Expression 'domain' is not being used in PARTITION BY sub clause of WINDOWED BY clause
Kinesis Firehose был предложенным решением, но это слепое окно для всех child_id
, поэтому он может сократить количество сеансов до нескольких, и это то, чего я пытаюсь избежать.
Есть предложения? Такое ощущение, что это может быть не тот инструмент.