Как сохранить возвращаемое значение UDF для входящих событий, используя Stream Analytics? - PullRequest
0 голосов
/ 23 января 2019

Мне нужно перевести приведенный ниже код C # во что-то, что может использоваться Azure Stream Analytics.

У меня есть приложение на C #, похожее на следующее:

var inputEvents = new List<Event>();
foreach (var file in files){
    (List<Event> events, DateTime maxDate) = ProcessEvents(file, inputEvents);
    inputEvents = events.Where(e => e.Duration == null).ToList();
}

Где ProcessEvents () передает inputEvents другим вспомогательным методам

Мне нужно реализовать весь этот код с помощью Stream Analytics.Часть var file реализована путем сбора группы событий с использованием Collect ().Каждая партия отправляется в UDF, которая действует как ProcessEvents () .Однако ProcessEvents () возвращает другие события, необходимые для следующей итерации.Поскольку UDF не имеет состояния, следующий пакет не сможет использовать возвращенные события из предыдущего пакета.

Как переписать код C #, указанный выше в Stream Analytics?

попробовал следующее:

  • Использовать UDA для хранения возвращенных событий.Ошибка, потому что по какой-то причине он не может хранить массив JSON и постоянно изменять его.
  • Использовать ввод справочных данных для хранения возвращенных событий.Не удалось, поскольку они могут использоваться только в Stream Analytics с использованием JOIN, а не в UDF.

Запрос T-SQL Stream Analytics:

WITH eventsCollection AS (SELECT COLLECT() AS allEvents
FROM EventHubStreamMessage
GROUP BY SessionWindow(minute,2,4)),

step1 AS (
    SELECT UDF.SampleProcessEvents(allEvents) as Source
    FROM eventsCollection
)

SELECT *
INTO [StorageTable]
FROM step1 

Код UDF Stream Analytics (краткийверсия):

function main(allEvents){
    allEvents = JSON.stringify(allEvents);
    var inputEvents = new Array ();

    return processEvents(JSON.parse(allEvents), inputEvents)
}

function processEvents(allEvents, inputEvents){
    for (i=0l i<allEvents.length; i++){
        if (allEvents[i].Event =="ON"){
            powerOn(inputEvents);
        }
    }
}

function powerOn(inputEvents){
    return true;
}
...