Azure Stream Analytics Query для объединения двух событий - PullRequest
0 голосов
/ 28 октября 2019

Я пытаюсь объединить два различных события (EventB и EventC), которые поступают из одного входа EventHub. Чего я хочу добиться, так это выводить (функция Azure) консолидированное событие (EventB + EventC) всякий раз, когда получено EventC.

Вот как выглядят события:

{
    "EventB": {
        "Claim": {
            "EventAUri": "A/123",
            "Uri": "B/456"
        },
        "Metainfo": {
            "Color": "Green"
        }
    }   
}

и

{
    "EventC" : {
        "Claim": {
            "EventBUri": "B/456"
        },
        "Target": {
            "City": "Berlin",
            "Position": {
                "Latitude": 50.325096,
                "Longitude": 72.19710
            }
        }
    }
}

EventB будет отправлено только один раз, тогда как EventC будет отправлено несколько раз в минуту,Желаемый результат для приведенного выше примера:

    {
        "Claim": {
            "EventBUri": "B/456"
        },
        "Target": {
            "City": "Berlin",
            "Position": {
                "Latitude": 50.325096,
                "Longitude": 72.19710
            }
        },
        "BMetainfo": {
            "Color": "Green"
        }
    }

Это то, что я пробовал до сих пор:

WITH AllEvents AS (
    SELECT 
        *
    FROM
        ehubinput
),
EventB AS (
select
    EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
    select EventC
from AllEvents
Where EventC Is Not NUll
)

Select * From EventB 
 Inner Join EventC 
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5 
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri

К сожалению, мой код будет выводить EventB + x (для каждого EventC) * EventC вместо EventB + Last EventC ....

Может кто-нибудь помочь мне с этим?

Обновление:

Этоэто мой вход .

Это мой текущий выход . (Я хочу, чтобы только последний EventC сочетался с EventB, а не с каждым событием в потоке, как я это делаю сейчас)

1 Ответ

1 голос
/ 31 октября 2019

Я воспроизвел ваш случай и предложил следующий запрос:

WITH AllEvents AS (
  SELECT 
    *
  FROM
  Input
),
EventB AS (
 select
 EventB
 From AllEvents
 Where EventB Is Not NUll
),
EventC AS (
  select EventC, EventC.Time
  from AllEvents
  Where EventC Is Not NUll
),
test as (
  Select *, EventC.* From EventB 
  Inner Join EventC 
  On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5 
 AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri)


select topone() over (order by Time) from test  GROUP BY TumblingWindow(second, 10)   

Для серии событий он всегда будет возвращать последнюю комбинацию пары (EventC, EventB), которая соответствует. Если это не ваш ожидаемый результат, не могли бы вы, пожалуйста, для указанного выше ввода написать ожидаемый результат?

Я использовал VS2019 и расширение Stream Analytics. Я указал локальные входы в соответствии с вашим описанием выше.

Обновление

Запрос был обновлен. Я заметил, что только ваш последний EventC в примере полезной нагрузки содержит свойство «Time». Имея это свойство для каждого события C и используя приведенный выше запрос, вы получите «Wasserburg» в результате.

Конечно, выходные данные должны быть отформатированы, но результат в этом случае правильный.

Дальнейшее обновление Я немного поиграл с этим, поскольку нашел его действительно интересным, и предложил следующий запрос, который концептуально отличается от предыдущего, и я бы сказал, еще более точным:

-- timestamp by how events are enqueued
WITH AllEvents AS (
    SELECT 
       Input
     FROM 
     Input timestamp by input.EventEnqueuedUtcTime  
    ),

-- get the last eventB, because only last eventB is relevant
EventB AS (
    select last(AllEvents.Input) over (limit duration(minute, 1)  when AllEvents.input.EventB Is Not NUll)  as EventB 
    From AllEvents 
 ),
 LastB as (select topone() over (order by EventB.Time) from EventB GROUP BY  slidingwindow(second, 60)),

 -- get the last eventC
 EventC AS (
    select last(AllEvents.Input) over (limit duration(minute, 1)  when AllEvents.input.EventC Is Not NUll)  as EventC 
    From AllEvents 
 ),
LastC as (select topone() over (order by EventC.Time) from EventC GROUP BY  slidingwindow(second, 60)),

-- create the result if the join between last EventB and last EventC exists
ResultJoin as (
   Select LastB.topone.*, LastC.topone.* From LastB 
   Inner Join LastC 
   On DATEDIFF(second, LastB, LastC) BETWEEN 0 AND 60 
   AND LastB.topone.EventB.EventB.Claim.Uri  = LastC.topone.EventC.EventC.Claim.EventBUri)

-- get the last event that is a pair of EventB,EventC
select topone() over (order by EventB.Time) into Output from ResultJoin  GROUP BY  slidingwindow(second, 60)

-- Just a cross-check what is the last event B
select * into Output1 from LastB

Я использовал функции временного окна, потому что вы упомянули, что события приходят в течение минутного периода времени. По сути, идея состоит в том, чтобы извлечь последнее событие B и событие lastC, а затем сопоставить одно из них для распространения на выход.

Я проверил его на реальном концентраторе событий с издателем сообщений концентратора событий, чтобы я мог смоделировать поток событий, как в вашем примере: enter image description here

Ипосле этого я смотрел вывод локально, чтобы увидеть, получу ли я правильный результат после последнего события:

enter image description here

Кроме того, я добавил свойство time вкаждое событие (B и C), как вы можете видеть из симулятора сообщений, потому что это свойство используется для упорядочения событий в запросе. Конечно, вы можете заменить это другим свойством, таким как EventEnqueuedUtcTime или чем-то подобным.

Надеюсь, вы найдете один из этих двух разных подходов полезным для вашего окончательного решения.

...