Я написал реализацию pyspark для чтения строки за строкой для последовательного (и рекурсивного) умножения значения столбца в последовательности. Из-за ограничений платформы с нашей стороны мне нужно преобразовать это в Scala сейчас без UDAF. Я посмотрел на эту реализацию , но это занимает много времени, так как число year_months растет, так как ему нужно # временных таблиц как # year_months.
Существует около 100 year_months и 70 отделов, в которых общее количество строк в этом кадре данных равно 7000. Нам нужно взять начальное значение (по первому месяцу в последовательности) для каждого отдела и умножить его на значение следующей строки. Полученный умноженный коэффициент необходимо умножить на следующую строку и т. Д.
Пример данных:
department, productivity_ratio, year_month
101,1.00,2013-01-01
101,0.98,2013-02-01
101,1.01,2013-03-01
101,0.99,2013-04-01
...
102,1.00,2013-01-01
102,1.02,2013-02-01
102,0.96,2013-03-01
...
Ожидаемый результат:
department,productivity_ratio,year_month,chained_productivity_ratio
101,1.00,2013-01-01,1.00
101,0.98,2013-02-01,0.98 (1.00*0.98)
101,1.01,2013-03-01,0.9898 (1.00*0.98*1.01)
101,0.99,2013-04-01,0.9799 (1.00*0.98*1.01*0.99)
...
102,1.00,2013-01-01,1.00 (reset to 1.00 as starting point as department name changed in sequence)
102,1.02,2013-02-01,1.02 (1.00*1.02)
102,0.96,2013-03-01,0.9792 (1.00*1.02*0.96)
...
Есть ли способ чтобы реализовать это более быстрым способом в scala, либо преобразовав это в al oop по отделам, и посмотрев на производительность_отношения как последовательность для умножения на предыдущее значение, либо изменив фрейм данных в другую структуру данных, чтобы избежать проблем с распределенной последовательностью .
Существующий код pyspark:
%pyspark
import pandas as pd
import numpy as np
import StringIO
inputParquet = "s3://path/to/parquet/files/"
inputData = spark.read.parquet(inputParquet)
inputData.printSchema
root
|-- department: string
|-- productivity_ratio: double
|-- year_month: date
inputSorted=inputData.sort('department', 'year_month')
inputSortedNotnull=inputSorted.dropna()
finalInput=inputSortedNotnull.toPandas()
prev_dept = 999
prev_productivity_ratio = 1
new_productivity_chained = []
for t in finalInput.itertuples():
if prev_dept == t[1]:
new_productivity_chained.append(t[2] * prev_productivity_ratio)
prev_productivity_ratio = t[2] * prev_productivity_ratio
else:
prev_productivity_ratio = 1
new_productivity_chained.append(prev_productivity_ratio)
prev_dept = t[1]
productivityChained = finalInput.assign(chained_productivity=new_productivity_chained)