Как проанализировать массив массивов с помощью Siddhi - PullRequest
0 голосов
/ 09 января 2020

Я пытаюсь проанализировать поток событий JSON, используя WSO2 Streaming Integrator / Siddhi, но, похоже, не удалось правильно загрузить события в поток. Я использую dockerimage wso2/streaming-integrator:1.0.0 для запуска этого.

Я посылаю следующий поток ввода в SiddhiApp (как HTTP POST в конечную точку Siddhi):

[
    [ 1502942400000, "4261.480" ],
    [ 1503982400000, "1010.312" ]
]

Мое определение потока выглядит следующим образом:

@source(type='http', receiver.url='http://0.0.0.0:5010/stream', @map(type='json', @attributes(
        timestamp = "[0]", sensorvalue = "[1]"
        )))
define stream SensorAlertsStream (
    timestamp long,
    sensorvalue double,

Я использую почтальон для отправки данных в WSO2 Streaming Integrator, но все, что я получаю, это ошибки или отсутствие ответа.

Я пытался следующая конфигурация атрибутов, все с неудачными результатами:

 - timestamp = "$.[0]", sensorvalue = "$.[1]"
 - timestamp = "$[0]", sensorvalue = "$[1]"
 - timestamp = "$[0][0]", sensorvalue = "$[0][1]"
 - timestamp = "[0]", sensorvalue = "[1]"

Я что-то упустил в определении источника или атрибуте отображения, или я должен сделать вывод, что эта структура входных данных массива массивов просто не может быть проанализирована с помощью siddhi- json отображение?

Если это поможет; это мое упрощенное определение приложения:

@App:name("SO-SensorApp")

@source(type='http', receiver.url='http://0.0.0.0:5010/stream', @map(type='json', enclosing.element="$[0]", @attributes( timestamp = "[0]", sensorvalue = "[1]" )) )
define stream SensorAlertsStream ( timestamp long, sensorvalue double );

@sink(type='log', @map(type='json')) 
define stream log_values( sensorvalue double );

from SensorAlertsStream select sensorvalue insert into log_values;

1 Ответ

0 голосов
/ 09 января 2020

Я протестировал это в Siddhi Tooling 5.1.2 с siddhi-map- json -5.0.6, следующие конфиги решат проблему, enclosureing.element- '$', timestamp = "[0]", sensorvalue = "[1]"

@source(type = 'http', receiver.url = 'http://0.0.0.0:8007/stream', 
    @map(type = 'json', enclosing.element = "$", 
        @attributes( timestamp = "[0]", sensorvalue = "[1]" )) )
@sink(type = 'log') 
define stream SensorAlertsStream1 ( timestamp long, sensorvalue double );
...