У меня следующая ситуация.
В потоке приложений Kinesis у меня есть следующая структура.
введите описание изображения здесь
Таким образом, в местоположении (LOCATION) различные машины (MACHINE) могут отправлять статистику (CHARACTERISTIC) с текущим значением (VALUE) и некоторыми ограничениями (MIN_LIMIT, MAX_LIMIT).
В части Kinesis Analytics Iнужен шаблон SQL, который будет анализировать эти данные и вставлять записи в целевой поток, когда машина будет превышать лимит (MIN или MAX) несколько раз во временном окне, скажем, 1 час.
У меня есть что-то, но до сих пор оно не работает, как ожидалось.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(
"LOCATION" VARCHAR(50),
"MACHINE" varchar(50),
"CHARACTERISTIC" varchar(50),
"VALUE" DOUBLE,
"MIN_LIMIT" DOUBLE,
"MAX_LIMIT" DOUBLE,
"EMAIL" VARCHAR(100));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
(
"LOCATION",
"MACHINE",
"CHARACTERISTIC",
"VALUE",
"MIN_LIMIT",
"MAX_LIMIT"
"EMAIL",
"NUMBER_OF_HITS",
"TIME_WINDOW")
FROM "test_001"
WHERE (VALUE < MIN_LIMIT or VALUE > MAX_LIMIT)
WINDOW W1 AS (
PARTITION BY CHARACTERISTIC
RANGE INTERVAL TIME_WINDOW HOUR PRECEDING)
Может кто-нибудь помочь мне с этим? Спасибо