Я воспроизвел ваш случай и предложил следующий запрос:
WITH AllEvents AS (
SELECT
*
FROM
Input
),
EventB AS (
select
EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
select EventC, EventC.Time
from AllEvents
Where EventC Is Not NUll
),
test as (
Select *, EventC.* From EventB
Inner Join EventC
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri)
select topone() over (order by Time) from test GROUP BY TumblingWindow(second, 10)
Для серии событий он всегда будет возвращать последнюю комбинацию пары (EventC, EventB), которая соответствует. Если это не ваш ожидаемый результат, не могли бы вы, пожалуйста, для указанного выше ввода написать ожидаемый результат?
Я использовал VS2019 и расширение Stream Analytics. Я указал локальные входы в соответствии с вашим описанием выше.
Обновление
Запрос был обновлен. Я заметил, что только ваш последний EventC в примере полезной нагрузки содержит свойство «Time». Имея это свойство для каждого события C и используя приведенный выше запрос, вы получите «Wasserburg» в результате.
Конечно, выходные данные должны быть отформатированы, но результат в этом случае правильный.
Дальнейшее обновление Я немного поиграл с этим, поскольку нашел его действительно интересным, и предложил следующий запрос, который концептуально отличается от предыдущего, и я бы сказал, еще более точным:
-- timestamp by how events are enqueued
WITH AllEvents AS (
SELECT
Input
FROM
Input timestamp by input.EventEnqueuedUtcTime
),
-- get the last eventB, because only last eventB is relevant
EventB AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventB Is Not NUll) as EventB
From AllEvents
),
LastB as (select topone() over (order by EventB.Time) from EventB GROUP BY slidingwindow(second, 60)),
-- get the last eventC
EventC AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventC Is Not NUll) as EventC
From AllEvents
),
LastC as (select topone() over (order by EventC.Time) from EventC GROUP BY slidingwindow(second, 60)),
-- create the result if the join between last EventB and last EventC exists
ResultJoin as (
Select LastB.topone.*, LastC.topone.* From LastB
Inner Join LastC
On DATEDIFF(second, LastB, LastC) BETWEEN 0 AND 60
AND LastB.topone.EventB.EventB.Claim.Uri = LastC.topone.EventC.EventC.Claim.EventBUri)
-- get the last event that is a pair of EventB,EventC
select topone() over (order by EventB.Time) into Output from ResultJoin GROUP BY slidingwindow(second, 60)
-- Just a cross-check what is the last event B
select * into Output1 from LastB
Я использовал функции временного окна, потому что вы упомянули, что события приходят в течение минутного периода времени. По сути, идея состоит в том, чтобы извлечь последнее событие B и событие lastC, а затем сопоставить одно из них для распространения на выход.
Я проверил его на реальном концентраторе событий с издателем сообщений концентратора событий, чтобы я мог смоделировать поток событий, как в вашем примере:
Ипосле этого я смотрел вывод локально, чтобы увидеть, получу ли я правильный результат после последнего события:
Кроме того, я добавил свойство time вкаждое событие (B и C), как вы можете видеть из симулятора сообщений, потому что это свойство используется для упорядочения событий в запросе. Конечно, вы можете заменить это другим свойством, таким как EventEnqueuedUtcTime или чем-то подобным.
Надеюсь, вы найдете один из этих двух разных подходов полезным для вашего окончательного решения.