Я переносу задание потоковой аналитики из Databricks / Spark в Azure Stream Analytics.Входные данные поступают из IoTHub, и запрос должен генерировать события каждый раз, когда значение датчика изменяется между пороговыми диапазонами (например, от «предупреждения» до «предупреждения»).
Существующее решение использует «потоковую передачу с отслеживанием состояния»,т.е. он хранит последнее состояние на устройство в памяти и сравнивает каждое новое сообщение.При запуске задания (или в некоторых дополнительных сценариях) отсутствует «последний статус»;в этом случае создается дополнительное событие, которое корректно обрабатывается нижестоящими компонентами.
Я пытаюсь реализовать эту функцию в ASA:
- Сравнение с последней записью может быть легкосделано с помощью
lag(value, 1, null) over (partition by(serialMachine) limit duration(minute, 60))
При тестировании с локальными входными данными результат, описанный выше, является пустым для первой записи, которую можно использовать для создания сообщения. Но при запуске в Azure "лаг" возвращает значение,
, даже если исходная запись для него имеет временную метку до настроенного времени начала задания .Я предполагаю, что это рассматривается как «время начала вывода», и все доступные или, по крайней мере, некоторые сообщения загружаются из IoTHub независимо от этой отметки времени.
Я пытался использовать функции ISFIRST и LAST, но все этиобратитесь к временному окну, т.е. производные условия будут выполняться периодически.Но мне это нужно только один раз.
Есть идеи для обхода?