Каков рекомендуемый способ для Azure Stream Analytics обрабатывать / маршрутизировать сообщения IoT различных размеров и содержимого с того же устройства - PullRequest
0 голосов
/ 07 июня 2019

Сначала мы хотим отправить (и зарегистрировать) сообщения о калибровке с одного из наших устройств. Затем, после того как он полностью откалиброван, мы хотим отправлять обычные сообщения телеметрии IoT.

Рабочий процесс: данные передаются из IoT Hub в Azure Stream Analytics, а затем в базу данных Azure Sql.

Содержание сообщения IoT может меняться в зависимости от того, калибруется устройство или нет. Чтобы определить калибровку устройства, мы изменим поле имени этого устройства на «калибровка». Затем для отправки обычных данных телеметрии имя устройства вернется к содержанию его имени.

Следовательно, в ASA, чтобы определить, является ли сообщение сообщением о калибровке: мы проверяем содержимое поля имени устройства.

Итак, проблему нужно решить примерно так:

if message-contents.device-name = 'calibration' then
 get these 10 fields from the message
 write them to Calibration DB
else
 get these different 5 fields from the message
 call a ML function with some of these input fields
 write result to Telemetry DB
end

Я попытался написать код Azure Stream Analytics, который использует инструкцию CASE.

WITH ALLMESSAGES AS ( 
SELECT *
FROM iothubinput2018aug21 ),

QUERY1 AS (
SELECT try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME, 
       try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
       try_cast(devicenumber as nvarchar(max)) as DEVICE_UID,
       try_cast(emptymax as nvarchar(max)) as EMPTYMAX,
       try_cast(emptymin as nvarchar(max)) as EMPTYMIN,
       try_cast(emptysum as nvarchar(max)) as EMPTYSUM,
       try_cast(fullmax as nvarchar(max)) as FULLMAX,
       try_cast(fullmin as nvarchar(max)) as FULLMIN,
       try_cast(fullsum as nvarchar(max)) as FULLSUM,
       try_cast(emptyresult as nvarchar(max)) as EMPTYRESULT,
       try_cast(fullresult as nvarchar(max)) as FULLRESULT,
       try_cast(fw as nvarchar(max)) as FIRMWARE_VERSION,
       TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
       0 AS REC_READ
FROM ALLMESSAGES
WHERE DEVICE_NAME = 'calibration'),

QUERY2 AS (
SELECT
       try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME, 
       try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
       try_cast(uid as nvarchar(max)) as DEVICE_UID,
       try_cast(weight as nvarchar(max)) as DEVICE_SENSOR_READINGS,
       TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
       MLFunctionBatch2019Apr10(DEVICE_MAC_ID, DEVICE_SENSOR_READINGS) AS RESULT,
       UDF.SUMOFREADINGS(DEVICE_SENSOR_READINGS) AS DEVICE_SENSOR_READINGS_SUM,
       0 AS REC_READ
FROM ALLMESSAGES
WHERE DEVICE_NAME != 'calibration')

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       DEVICE_SENSOR_READINGS,
       ASA_POSTTIME, 
       CASE result."Scored Labels" WHEN '1' THEN 'FULL' ELSE 'EMPTY' END AS MLSVC_RESULT,
       REC_READ,
       DEVICE_SENSOR_READINGS_SUM
INTO OUT2SQLDBDEV                
FROM QUERY2
WHERE DEVICE_MAC_ID != 'calibration'

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       EMPTYMAX,
       EMPTYMIN,
       EMPTYSUM,
       FULLMAX,
       FULLMIN,
       FULLSUM,
       EMPTYRESULT,
       FULLRESULT,
       FIRMWARE_VERSION,
       ASA_POSTTIME,
       REC_READ
INTO OUT2SQLDBCALIBRATIONDEV
FROM QUERY1
WHERE DEVICE_MAC_ID = 'calibration'

Я ожидаю, что вывод перейдет в нужную таблицу БД в зависимости от того, является ли имя устройства калибровочным или что-то еще. Это не происходит Нет выходных данных. И с другой попытки запрос ASA завершается неудачно, поскольку поле базы данных ожидает значения NOT NULL, но запрос отправляет значение NULL.

1 Ответ

0 голосов
/ 11 июня 2019

Мне нужно было объединить значения, чтобы убедиться, что они НЕ НУЛЬЫ, тогда запросы работали. Пожалуйста, смотрите ниже.

WITH ALLMESSAGES AS ( 
SELECT *
FROM iothubinput2018aug21 ),

QUERY1 AS (
SELECT
       try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME, 
       try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
       try_cast(uid as nvarchar(max)) as DEVICE_UID,
       try_cast(weight as nvarchar(max)) as DEVICE_SENSOR_READINGS,
       TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
       '1' AS RESULT,
        UDF.SUMOFREADINGS(try_cast(weight as nvarchar(max))) AS DEVICE_SENSOR_READINGS_SUM,
       0 AS REC_READ
FROM ALLMESSAGES
WHERE COALESCE(deviceid,'NULL') != 'calibration'), 

QUERY2 AS (
SELECT try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME, 
       try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
       try_cast(devicenumber as nvarchar(max)) as DEVICE_UID,
       try_cast(emptymax as nvarchar(max)) as EMPTYMAX,
       try_cast(emptymin as nvarchar(max)) as EMPTYMIN,
       try_cast(emptysum as nvarchar(max)) as EMPTYSUM,
       try_cast(fullmax as nvarchar(max)) as FULLMAX,
       try_cast(fullmin as nvarchar(max)) as FULLMIN,
       try_cast(fullsum as nvarchar(max)) as FULLSUM,
       try_cast(emptyresult as nvarchar(max)) as EMPTYRESULT,
       try_cast(fullresult as nvarchar(max)) as FULLRESULT,
       try_cast(fw as nvarchar(max)) as FIRMWARE_VERSION,
       TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
       0 AS REC_READ
FROM ALLMESSAGES
WHERE COALESCE(deviceid,'NULL') = 'calibration')

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       DEVICE_SENSOR_READINGS,
       ASA_POSTTIME, 
       'FULL' AS MLSVC_RESULT,
       REC_READ,
       DEVICE_SENSOR_READINGS_SUM
INTO OUT2SQLDBDEV                
FROM QUERY1
WHERE COALESCE(DEVICE_NAME, 'NULL') != 'calibration'

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       DEVICE_SENSOR_READINGS,
       ASA_POSTTIME, 
       'FULL' AS MLSVC_RESULT,
       REC_READ,
       DEVICE_SENSOR_READINGS_SUM
INTO OUT2BLOB2018OCT24              
FROM QUERY1
WHERE COALESCE(DEVICE_NAME, 'NULL') != 'calibration'

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       EMPTYMAX,
       EMPTYMIN,
       EMPTYSUM,
       FULLMAX,
       FULLMIN,
       FULLSUM,
       EMPTYRESULT,
       FULLRESULT,
       FIRMWARE_VERSION,
       ASA_POSTTIME,
       REC_READ
INTO OUT2SQLDBCALIBRATIONDEV
FROM QUERY2
WHERE COALESCE(DEVICE_NAME, 'NULL') = 'calibration'
...