Использовать Kinesis Analytics для анализа событий и связанных с ними пропущенных событий, разделенных по времени? - PullRequest
0 голосов
/ 22 мая 2018

У меня есть поток событий для различных устройств, которые могут быть «подключены» или «отключены».

Т.е. событие имеет следующую структуру:

  • отметка времени
  • device_id
  • событие («подключено» или «отключено»)

Я хочу немедленно запустить действие, когда устройство было отключено и не подключено в течение (настраиваемого устройства) периода времени, например, 1 часа.Я хочу запускать только один раз для каждого «отключенного» события.

Это можно сделать с помощью AWS Kinesis Analytics, и если да, то как будет выглядеть запрос?Если нет, то можно ли это сделать с помощью какого-либо другого инструмента, или мне нужно создать его по индивидуальному заказу?

1 Ответ

0 голосов
/ 27 мая 2018

Это возможно с Drools Kinesis Analytics (управляемый сервис на Amazon):

Типы:

package com.test;

import java.util.Set;

import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;

declare DeviceConfig
    @DynamoDBTable(tableName="DeviceConfig")

    deviceId: int @DynamoDBHashKey(attributeName="device_id");
    timeoutMillis: int @DynamoDBAttribute(attributeName="timeout_millis");
end

declare DeviceEvent
@role( event )
    // attributes 
    deviceId: int;
    timestamp: java.util.Date;
    event: String;
end

declare DisconnectAlert
    deviceId: int;
end

Правила:

package com.test;

// setup dynamic timer
rule "disconnect timer"
    timer( expr: $timeout )
when
    $event : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
    DeviceConfig(deviceId == $event.deviceId, $timeout : timeoutMillis) from entry-point configs
then
    insertLogical(new DisconnectAlert($event.getDeviceId()));
end

rule "remove dups"
when
    $event : DeviceEvent( $id : deviceId, $state : event ) from entry-point events
    $dup : DeviceEvent(this != $event, deviceId == $event.deviceId, event == $state, this after $event) from entry-point events
then
    delete($dup);
end

// on connect event remove "disconnected" state
rule "connect device"
when
    $disconnected : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
    DeviceEvent(deviceId == $disconnected.deviceId, event == "connected", this after $disconnected) from entry-point events
then
    delete($disconnected);
end

// cleanup "connected" state to free up memory (not needed any more)
rule "delete connected state"
when
    $connected : DeviceEvent(event == "connected") from entry-point events
then
    delete($connected);
end

Обратите внимание, что существует 2 типа входов:

  • DeviceConfig , который в основном представляет собой статическую конфигурацию устройства, расположенную в DynamoDB.
  • DeviceEvent , который представляет собой поток событий устройства Kinesis.
...