Сеанс Kinesis Analytics или пакетное окно с разбивкой без агрегирования - PullRequest
0 голосов
/ 21 марта 2020

Я хочу использовать Kinesis Data Analytics (или какую-либо другую управляемую службу AWS) для пакетной обработки записей на основе критериев фильтрации. Идея состояла бы в том, что по мере поступления записей мы запускаем окно сеанса и пакетируем любые подходящие записи в течение 15 минут.

Пошаговое окно - это именно то, что нам нужно, за исключением того, что мы не собираемся собирать данные, а просто возвращаем все записи вместе.

В идеале ...

100 records spread over 15 min. (20 matching criteria) with first one at 10:02
                                 |
                                 v
At 10:17, the 20 matching records would be sent to the destination

Я пытался сделать что-то вроде:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    "device_id" INTEGER, 
    "child_id" INTEGER, 
    "domain" VARCHAR(32),
    "category_id" INTEGER,
    "posted_at" DOUBLE,
    "block" TIMESTAMP
);

-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"

-- Select all columns from source stream
SELECT STREAM 
    "device_id", 
    "child_id", 
    "domain", 
    "category_id", 
    "posted_at",
    FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) as block
FROM "SOURCE_SQL_STREAM_001"
WHERE "category_id" = 888815186
WINDOWED BY STAGGER (
    PARTITION BY "child_id", FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) 
    RANGE INTERVAL '15' MINUTE);

Я продолжаю получать ошибки для всех столбцов, не входящих в агрегирование:

From line 6, column 5 to line 6, column 12: Expression 'domain' is not being used in PARTITION BY sub clause of WINDOWED BY clause

Kinesis Firehose был предложенным решением, но это слепое окно для всех child_id, поэтому он может сократить количество сеансов до нескольких, и это то, чего я пытаюсь избежать.

Есть предложения? Такое ощущение, что это может быть не тот инструмент.

1 Ответ

0 голосов
/ 08 апреля 2020

try LAST_VALUE("domain") as domain в предложении select.

...