Spark: Как построить полуаддитивные метрики или совокупную сумму по части столбца? - PullRequest
0 голосов
/ 17 мая 2019

Я пытаюсь воспроизвести некоторые аналитики, которые я делаю в традиционной BI в рамках spark.Используемый технический термин - как строить полуаддитивные метрики, но может помочь, если я объясню, что это значит.

Например, скажем, у меня есть список сумм запасов на каждый день.Вчера у меня было 100, сегодня у меня 50. Это полуаддитивная метрика, потому что у вас нет 150. У вас есть 50. Таким образом, вы хотите подвести итоги только за самый текущий день.Но что-то вроде продаж будет полностью аддитивным, и вы будете суммировать, например, все продажи за весь год.

Итак, вопрос в том, как построить полуаддитивную метрику, используя agg и sum?И как мне написать выражение agg, которое показало как поладдитивные, так и полностью аддитивные метрики вместе?Например:

val stocks = (Seq(
             ("2019-05-01", 1, "FB", 1058.45, 100000),
             ("2019-05-01", 1, "NVDA", 40058.45, 150000),
             ("2019-05-03", 1, "FB", 8058.45, 80000),
             ("2019-05-04", 1, "FB", 11058.45, 75000),  // Latest FB entry for account 1 
             ("2019-05-05", 1, "NVDA", 50058.45, 125000),  // Latest NVDA entry for account 1
             ("2019-05-01", 2, "FB", 1058.45, 200000),
             ("2019-05-02", 2, "NVDA", 5058.45, 125000),
             ("2019-05-03", 2, "NVDA", 5058.45, 115000),
             ("2019-05-05", 2, "FB", 1058.45, 65000),   // latest FB entry for account 2
             ("2019-05-06", 2, "NVDA", 5058.45, 105000)  // latest NVDA entry for account 2
          ).toDF("date", "symbol", "account", "sale", "current_holdings"))

 stocks
     .groupBy( stocks.col("symbol") )
     .add( sum("earnings"), sum("current_holdings") )
     .show()

Что это даст:

+------+---------+----------------+
|symbol|sale     |current_holdings|
+------+---------+----------------+
|    FB| 34291.80|        520000.0|
|  NDVA|105292.20|        525500.0|
+------+---------+----------------+

Должно дать:

+------+---------+----------------+
|symbol|sale     |current_holdings|
+------+---------+----------------+
|    FB| 34291.80|          140000|
|  NDVA|105292.20|          230000|
+------+---------+----------------+

В ожидаемой разнице только в столбце current_holdings, которыйбудет суммировать все последние записи по всем счетам.Таким образом, добавляя последние записи для FB, вы получаете:

FB = 75000 + 65000
NVDA = 125000 + 105000

Я смотрел на WindowFunctions, но я не вижу, как указать условия суммы, кроме конкретных индексов в разделах, и этобыло бы трудно, если бы я сказал, что мне нужно сложить все за определенный месяц.Как это сделать со Spark?

PS: Пожалуйста, извините за странный пример, который мне пришлось адаптировать для публичного просмотра.

PSS: Я также сделал это довольно сложно, потому что последняя датадля каждого аккаунта / символа не совпадает с предсказуемой границей.В моей конкретной ситуации я на самом деле пытаюсь суммировать вещи, которые относятся только к последнему месяцу данного периода времени (год, квартал и т. Д.).Я ожидаю, что это будет более простая ситуация, но я хотел полностью изучить полуаддитивные варианты использования, поэтому я усугубил проблему.

1 Ответ

1 голос
/ 18 мая 2019

Решение PySpark, которое может быть изменено на эквивалентный Scala код.

Использование row_number для нумерации строк для каждой учетной записи, символа в порядке даты desc и суммирования удерживающей стоимости строки first для группы.

w=Window.partitionBy(stocks.account,stocks.symbol).orderBy(stocks.date.desc())
stocks = stocks.withColumn('rnum',row_number().over(w))
w1 = Window.partitionBy(stocks.symbol)
stocks = stocks.withColumn('sales',sum(stocks.sale).over(w1)).withColumn('holdings',sum(when(stocks.rnum==1,stocks.current_holdings).otherwise(0)).over(w1))
#Final selection
stocks.select(stocks.symbol,stocks.sales,stocks.holdings).distinct().show() 
...