У нас есть таблица в кусте, в которой данные о торговых приказах для каждого конца дня хранятся как order_date.Другими важными столбцами являются продукт , контракт , направление (то есть покупка и продажа), цена (цена размещенного заказа), ttime (время транзакции) и время выхода (когда ордер будет удален с рынка в тот же день).
Нам необходимо построить диаграмму в виде тиковых данных из основноготаблица с максимальными и минимальными ценами для каждой строки (ордера) с утра, когда рынок открылся до этого времени.Это должно быть для каждого продукта, контракта и направления, т.е. максимальные и минимальные цены для этого продукта, контракта и направления (покупка или продажа).
Наряду с тем, что мы должны исключить любые заказы из окна агрегациикоторый был закрыт / удален, т. е. у которого время выхода меньше ttime для строки.
Ожидаемое решение: При упорядоченном наборе данных на ttime мы должны получить максимальную и минимальную цены дляконкретный продукт, контракт и направление (maxPrice для покупки и продажи) для каждой строки (заказа) с утра до времени этого заказа.Таким образом, в SQL он будет работать с самостоятельным соединением и будет выглядеть следующим образом:
Это будет работать для каждого набора данных eod (order_date) в пакете:
select mainSet.order_id,mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.ttime,mainSet.direction,
max(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.direction,mainSet.ttime) as max_price,
min(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.direction,mainSet.ttime) as min_price
from order_table mainSet
join order_table aggSet
ON (mainSet.produuct=aggSet.product
mainSet.contract=aggSet.contract
mainSet.direction=aggSet.direction
mainSet.ttime>=aggSet.ttime
mainSet.ttime < aggSet.exittime)
Статистика объема:
среднее количество 7 миллионов записей
среднее количество для каждой группы (продукт, контракт, направление) = 600K
InSpark aboove SQL-код выглядит следующим образом:
val mainDF: DataFrame= sparkSession.sql("select * from order_table where order_date ='eod_date' ")
val maxPriceCol = max(col("price")).over(Window.partitionBy(col("product"),col("contract"),col("direction")))
val minPriceCol = min(col("price")).over(Window.partitionBy(col("product"),col("contract"),col("direction")))
val ndf=mainDf.alias("mainSet").join(mainDf.alias("aggSet"),
(col("mainSet.product")===col("aggSet.product")
&& col("mainSet.contract")===col("aggSet.contract")
&& col("mainSet.direction")===col("aggSet.direction")
&& col("mainSet.ttime")>= col("aggSet.ttime")
&& col("mainSet.ttime") < col("aggSet.exittime"))
,"inner")
.withColumn("max_price",maxPriceCol)
.withColumn("min_price",minPriceCol)
Хотя я преобразовываю вышеупомянутый SQL-код в искровой dataframe, он просто не завершается. Работа застревает, и много раз у меня возникают проблемы с памятью Container killed by YARN for exceeding memory limits. 4.9 GB of 4.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
Я пытался увеличить память и другие параметры, но не повезло.Я попытался перераспределить (товар, контракт, направление) сразу после создания фрейма данных из sql, так как в нем по умолчанию 200 разделов, но мне тоже не повезло.
mainDF.repartitionByRange("product","contract","direction")
Буду очень признателен за любое решение/ новый подход к этой проблеме.