Azure Stream Analytics: несколько Windows + СОЕДИНЕНИЯ - PullRequest
0 голосов
/ 23 мая 2018

Моя архитектура:

  • 1 EventHub с 8 разделами и 2 TPU
  • 1 Задание потоковой аналитики
  • 6 Windows на основетот же вход (от 1 до 6 минут)

Пример данных:

{side: 'BUY', ticker: 'MSFT', qty: 1, price: 123, tradeTimestamp: 10000000000}
{side: 'SELL', ticker: 'MSFT', qty: 1, price: 124, tradeTimestamp:1000000000}

EventHub PartitionKey равен ticker

Я бы хотел каждую секунду выдавать следующие данные:

(Total quantity bought / Total quantity sold) in the last minute, last 2mn, last 3mn and more

Что я пробовал:

WITH TradesWindow AS (
    SELECT
        windowEnd = System.Timestamp,
        ticker,
        side,
        totalQty = SUM(qty)
    FROM [Trades-Stream] TIMESTAMP BY tradeTimestamp PARTITION BY PartitionId
    GROUP BY ticker, side, PartitionId, HoppingWindow(second, 60, 1)
),
TradesRatio1MN AS (
    SELECT 
        ticker = b.ticker,
        buySellRatio = b.totalQty / s.totalQty
    FROM TradesWindow b /* SHOULD I PARTITION HERE TOO ? */
    JOIN TradesWindow s /* SHOULD I PARTITION HERE TOO ? */
    ON s.ticker = b.ticker AND s.side = 'SELL'
    AND DATEDIFF(second, b, s) BETWEEN 0 AND 1
    WHERE b.side = 'BUY'
)

 /* .... More windows.... */

/* FINAL OUTPUT: Joining all the windows */
SELECT
   buySellRatio1MN = bs1.buySellRatio,
   buySellRatio2MN = bs2.buySellRatio
   /* more windows */
INTO [output]
FROM buySellRatio1MN bs1 /* SHOULD I PARTITION HERE TOO ? */
JOIN buySellRatio2MN bs2 /* SHOULD I PARTITION HERE TOO ? */
ON bs2.ticker = bs1.ticker
AND DATEDIFF(second, bs1, bs2) BETWEEN 0 AND 1

Проблемы:

  • Для этого требуется 6 групп потребителей EventHub (каждая может иметь только 5 читателей), почему?У меня нет 5x6 операторов SELECT на входе, почему тогда?
  • Вывод не выглядит согласованным (я не знаю, правильны ли мои JOINы).
  • Иногда работане выводится вообще (может быть какая-то проблема с разделением? см. комментарии в коде о разделении)

Вкратце, есть ли лучший способ добиться этого?Я не смог найти ничего в документе и примерах о наличии нескольких окон и объединении их, а затем о соединении результатов предыдущих объединений только с 1 входа.

1 Ответ

0 голосов
/ 01 июня 2018

Для первого вопроса это зависит от внутренней реализации логики горизонтального масштабирования.Подробнее здесь .

Для вывода соединения я не вижу весь запрос, но если вы присоединяете запрос с 1-минутным окном к запросу с 2-минутным окномс 1-секундным «буфером» вы будете выводить только каждые 2 минуты.Оператор UNION будет лучше для этого.

Исходя из вашего примера и вашей цели, я думаю, что есть гораздо более простой способ написать этот запрос с помощью UDA (User Defined Aggregate).

Для этого ясначала определит функцию UDA, называемую «коэффициент»:

function main() {
this.init = function () {
    this.sumSell = 0.0;
    this.sumBuy  = 0.0;
}

this.accumulate = function (value, timestamp) {
    if (value.side=="BUY") {this.sumBuy+=value.qty};
    if (value.side=="SELL") {this.sumSell+=value.qty};
   }

this.computeResult = function () {
    if(this.sumSell== 0) {
        result = 0;
    }
    else {
        result =  this.sumBuy/this.sumSell;
    }
    return result;

}
}

Затем я могу просто использовать этот запрос SQL для окна 60 секунд:

SELECT
  windowEnd = System.Timestamp,
  ticker,
  uda.ratio(iothub) as ratio
FROM iothub PARTITION BY PartitionId
GROUP BY ticker, PartitionId, SlidingWindow(second, 60)
...