Окно агрегации Stream Analytics - PullRequest
       45

Окно агрегации Stream Analytics

0 голосов
/ 13 декабря 2018

Мне нужна помощь \ совет о том, как игнорировать старые события при выполнении агрегации в расширенном окне.У меня есть данные о продажах, которые передаются в Event Hub.Концентратор событий используется в качестве входного потока.Мне нужно произвести две метрики - агрегация 30 секунд (акробатика) - совокупная стоимость продаж за целый день, т. Е. От открытия ворот

Время открытия ворот является переменным (динамическим), поэтому я считываю эталонный набор данных из большого двоичного объекта;и присоединиться к дате и времени Gateopen к потоку продаж.30-секундная агрегация по падающему окну работает нормально.Учитывая, что ворота открыты, переменная;В настоящее время я использую 12-часовое окно переключения с 30-секундным переходом и пытаюсь ограничить событие, которое будет агрегировано, используя логику EventProcessDatetime> GateOpen.

SELECT 
        Dateadd(ss,-30,System.Timestamp )  AS TimeSliceUTCStart
        , System.Timestamp AS TimeSliceUTCEnd   
        , p.Section                                       AS Section
        , SUM(CASE WHEN p.Classification = 'Retail' 
                AND p.ActivityDateTime > p.GateOpen THEN p.[sales_amt_gross] ELSE 0 END)    AS SaleTotalRetail


   FROM FilteredBase p 
   GROUP BY 
          p.Section
            , HoppingWindow(Duration(Hour, 12), hop(second, 30),Offset(millisecond, -1)) 

Проблема: я получаю агрегированные продажи за предыдущий день \ timeslice,В целом результат, которого я пытаюсь достичь, прост.Магазин может быть открыт в течение 5,8,10 или 12 часов максимум.Мы хотим иметь возможность узнавать продажи как в прямом эфире, для каждого раздела в течение дня.Любой совет или совет будет высоко ценится.

1 Ответ

0 голосов
/ 27 декабря 2018

Интуитивно понятный запрос выглядит хорошо, но под прикрытием происходит то, что Azure Stream Analytics использует файл справочных данных, который действовал на момент каждого временного окна.Затем, когда он видит четное число предыдущего дня, он будет использовать контрольные данные, представленные в это время (что может сделать сравнение p.ActivityDateTime> p.GateOpen True для предыдущего времени открытия).

Iизменил запрос следующим образом (предположим, у вас есть 1 открытое событие в день на раздел).Дайте мне знать, если это работает для вас.Если этого не произойдет, вы можете отправить пример данных, чтобы я мог соответствующим образом изменить запрос.Мы рассмотрим, как сделать эти запросы проще для написания.

WITH thirdtysecReporting AS
(
    SELECT
        p.Section Section,
        DATETIMEFROMPARTS(DATEPART(year, System.Timestamp), DATEPART(month, System.Timestamp), DATEPART(day, System.Timestamp), 0, 0, 0, 0) as date,
        System.Timestamp Windowend,
        SUM(p.sales_amt_gross) thirtysecSales
    FROM input TIMESTAMP BY p.ActivityDateTime
    GROUP BY TumblingWindow(second, 30), p.Section
)

,hopping AS
(
    SELECT
        Section,
        System.Timestamp HopEnd,
        date,
        SUM(thirtysecSales) SumSales
    FROM thirdtysecReporting
    GROUP BY HoppingWindow(second, 86400, 30), Section, date -- Hopping on 24 hours, reported every 30 second
)

,filtered as -- This step ignores data from the previous day
(
    SELECT 
        Section,
        HopEnd,
        date,
        SUMQt = CASE
            WHEN DAY(HopEnd) = DAY(date) OR DATEPART(hour, HopEnd) = DATEPART(hour, date) THEN SumSales
            ELSE 0
        END
    FROM hopping
)

SELECT Section, -- Final query
        HopEnd,
        MAX(SUMQt) AS SumQt
FROM filtered
GROUP BY TumblingWindow(hour, 1), Section, hopend

Спасибо,

JS - Azure Stream Analytics

...