Преобразование искры для набора временных рядов / тиков - PullRequest
0 голосов
/ 20 сентября 2019

У нас есть таблица в кусте, в которой данные о торговых приказах для каждого конца дня хранятся как order_date.Другими важными столбцами являются продукт , контракт , направление (то есть покупка и продажа), цена (цена размещенного заказа), ttime (время транзакции) и время выхода (когда ордер будет удален с рынка в тот же день).

Нам необходимо построить диаграмму в виде тиковых данных из основноготаблица с максимальными и минимальными ценами для каждой строки (ордера) с утра, когда рынок открылся до этого времени.Это должно быть для каждого продукта, контракта и направления, т.е. максимальные и минимальные цены для этого продукта, контракта и направления (покупка или продажа).

Наряду с тем, что мы должны исключить любые заказы из окна агрегациикоторый был закрыт / удален, т. е. у которого время выхода меньше ttime для строки.

Ожидаемое решение: При упорядоченном наборе данных на ttime мы должны получить максимальную и минимальную цены дляконкретный продукт, контракт и направление (maxPrice для покупки и продажи) для каждой строки (заказа) с утра до времени этого заказа.Таким образом, в SQL он будет работать с самостоятельным соединением и будет выглядеть следующим образом:

Это будет работать для каждого набора данных eod (order_date) в пакете:

select mainSet.order_id,mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.ttime,mainSet.direction,
max(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.direction,mainSet.ttime) as max_price,
min(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.direction,mainSet.ttime) as min_price
from order_table mainSet 
join order_table aggSet
ON (mainSet.produuct=aggSet.product
mainSet.contract=aggSet.contract
mainSet.direction=aggSet.direction
mainSet.ttime>=aggSet.ttime
mainSet.ttime < aggSet.exittime)

Статистика объема:

среднее количество 7 миллионов записей

среднее количество для каждой группы (продукт, контракт, направление) = 600K

InSpark aboove SQL-код выглядит следующим образом:

val mainDF: DataFrame= sparkSession.sql("select * from order_table where order_date ='eod_date' ")
val maxPriceCol     = max(col("price")).over(Window.partitionBy(col("product"),col("contract"),col("direction")))
val minPriceCol     = min(col("price")).over(Window.partitionBy(col("product"),col("contract"),col("direction")))

val ndf=mainDf.alias("mainSet").join(mainDf.alias("aggSet"),
    (col("mainSet.product")===col("aggSet.product")
      && col("mainSet.contract")===col("aggSet.contract")
      && col("mainSet.direction")===col("aggSet.direction")
      && col("mainSet.ttime")>= col("aggSet.ttime")
      && col("mainSet.ttime") < col("aggSet.exittime"))
    ,"inner")
    .withColumn("max_price",maxPriceCol)
    .withColumn("min_price",minPriceCol)

Хотя я преобразовываю вышеупомянутый SQL-код в искровой dataframe, он просто не завершается. Работа застревает, и много раз у меня возникают проблемы с памятью Container killed by YARN for exceeding memory limits. 4.9 GB of 4.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead

Я пытался увеличить память и другие параметры, но не повезло.Я попытался перераспределить (товар, контракт, направление) сразу после создания фрейма данных из sql, так как в нем по умолчанию 200 разделов, но мне тоже не повезло.

mainDF.repartitionByRange("product","contract","direction")

Буду очень признателен за любое решение/ новый подход к этой проблеме.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...