Как агрегировать события с помощью AWS Kinesis - PullRequest
0 голосов
/ 24 марта 2019

Я пытаюсь использовать Kinesis для агрегирования событий за X минут и потоковой передачи всех агрегированных событий в лямбда-функцию. Однако я не могу найти способ для потоковой передачи all агрегированных событий с использованием GROUP BY или WINDOW, что заставляет меня применять агрегирующую функцию, которая уменьшает количество результатов, которые я передаю в пункт назначения.

Я пытаюсь использовать Kinesis для реализации пакетирования в моих данных, я не хочу изменять события или применять к ним какую-либо логику. Все, что я хочу, это агрегировать эти события за X минут и затем передавать их вместе в виде пакета.

Я пытался использовать Kinesis Analytics, чтобы выполнить это, используя следующий запрос:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    TICKER_SYMBOL VARCHAR(4), 
    SECTOR varchar(16),
    CHANGE REAL,
    PRICE REAL
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_STREAM"
    SELECT STREAM
        TICKER_SYMBOL,
        SECTOR,
        CHANGE,
        PRICE
    FROM "SOURCE_STREAM" WINDOW W1 AS (RANGE INTERVAL '30' SECOND PRECEDING);

Тем не менее, моя лямбда по-прежнему вызывается каждые несколько секунд, хотя я установил окно на 30 секунд (хотя я понимаю, что мой SQL, вероятно, неисправен).

START RequestId: b3a32ef6-b699-4dfb-a087-05c0f0948b20 Version: $LATEST
2019-03-24T15:29:29.583252 <---------- HERE
END RequestId: b3a32ef6-b699-4dfb-a087-05c0f0948b20
REPORT RequestId: b3a32ef6-b699-4dfb-a087-05c0f0948b20  Duration: 1.56 ms   Billed Duration: 100 ms Memory Size: 512 MB Max Memory Used: 66 MB  

START RequestId: 4098a5e4-1d43-4a86-b297-4ddb86070cf6 Version: $LATEST
2019-03-24T15:29:32.369730 <---------- HERE
END RequestId: 4098a5e4-1d43-4a86-b297-4ddb86070cf6
REPORT RequestId: 4098a5e4-1d43-4a86-b297-4ddb86070cf6  Duration: 6.26 ms   Billed Duration: 100 ms Memory Size: 512 MB Max Memory Used: 66 MB  

START RequestId: 7f46a06c-0674-447d-89b7-ff8a3df756cd Version: $LATEST
2019-03-24T15:29:35.421609 <---------- HERE
END RequestId: 7f46a06c-0674-447d-89b7-ff8a3df756cd

Я ищу решение, которое будет

  1. Агрегируйте события за X минут, не применяя к ним никакой логики, или не меняйте их каким-либо образом
  2. Поток агрегированных событий вместе в лямбда-функцию
...