Azure Stream Analytics альтернатива Sparks mapWithState - PullRequest
1 голос
/ 23 марта 2019

Есть ли способ в Azure Stream Analytics для создания какого-либо агрегата с пользовательским состоянием, как это делает Sparks mapWithState?

Вот мой сценарий:

У меня есть данные с устройств IoT, содержащие следующиеполя:

  • DeviceId
  • Позиция
  • Значение

Данные могут поступать не в порядке.

Всякий раз, когдановый пакет прибывает для данного DeviceId, я хочу вывести последние n позиций и значений для этого устройства.Как

Ввод: { "DeviceId": "A", "Position": 10, "Value": 100}

Выход: { "DeviceId": "A", "Positions": [10], "Value": [100]}


Следующий ввод: { "DeviceId": "A", "Position": 11, "Value": 101}

Выход: { "DeviceId": "A", "Positions": [10, 11], "Value": [100, 101]}


Следующий ввод: { "DeviceId": "A", "Position": 9, "Value": 99}

Вывод: { "DeviceId": "A", "Positions": [9, 10, 11], "Value": [9, 100, 101]}

В Spark Structured Streaming я бы реализовал это, используя groupBy и mapWithState.Есть ли способ реализовать это в ASA?

1 Ответ

1 голос
/ 27 марта 2019

в ASA вы можете использовать один из следующих методов, чтобы сделать это:

  • если у вас есть дополнительный столбец, который можно использовать для TIMESTAMP, вы можно использовать TIMESTAMP BY и ASA будет переупорядочивать события. Тогда ты можешь используйте LAG для получения последних событий для этого конкретного устройства.
  • без столбца отметок времени, вы можете создать COLLECTTOP оператор и упорядочить события в соответствии с вашим столбцом «Позиция»
  • в качестве альтернативы, вы можете реализовать свою собственную логику с состоянием, используя пользовательские агрегаты (UDA), как описано здесь .

Дайте мне знать, если вам нужна помощь для реализации одного из этих 3 методов. Я буду рад предоставить дополнительную информацию.

Спасибо

JS

...