У меня есть простой искровой sql с временным окном 5 минут, политика триггера каждую минуту:
val withTime = eventStreams(0).selectExpr("*", "cast(cast(parsed.time as long)/1000 as timestamp) as event_time")
val momentumDataAggQuery = withTime
.selectExpr("parsed.symbol", "parsed.bid", "parsed.ask", "event_time")
.withWatermark("event_time", "1 minutes")
.groupBy(col("symbol"), window(col("event_time"), "5 minutes", "60 seconds")) // change to 60 minutes
.agg(first("bid", true).as("first_bid"), first("ask").as("first_ask"), last("bid").as("last_bid"), last("ask").as("last_ask"))
val momentumDataQuery = momentumDataAggQuery
.selectExpr("window.start", "window.end", "ln(((last_bid + last_ask)/2)/((first_bid + first_ask)/2)) as momentum", "symbol")
Когда есть данные из потока, он запускается каждую минуту, чтобы вычислить «импульс», ноостановка, когда нет данных.Я ожидаю, что он будет продолжать использовать старые данные для обновления каждую минуту, даже если не хватает точки данных.
Рассмотрим пример в следующей таблице
В 1-м окне есть только одна точка данныхпоэтому возврат журнала равен нулю.
Во 2-м окне есть только две точки данных, поэтому требуется журнал (97.5625 / 97.4625), где 97.5625 было получено в 11:53, а 97.4625 было получено в 11:52:10, во временном окне 12:19 <> 12:54 ... он продолжал вычислять возврат журнала, когда было достаточно точки данных.
Однако, когда после 15 не было больше точки данных:56:12, скажем, для окна 12:54 <> 12:59, я ожидаю, что потребуется ln (97,8625 / 97,6625), где входные данные были сгенерированы в 11:56:12 и 11:54:11 соответственно.Однако это не так, красное поле никогда не генерировалось.
Что-то не так с моей искрой sql?