рекурсивное преобразование цепочек строк в pyspark в scala - PullRequest
0 голосов
/ 04 апреля 2020

Я написал реализацию pyspark для чтения строки за строкой для последовательного (и рекурсивного) умножения значения столбца в последовательности. Из-за ограничений платформы с нашей стороны мне нужно преобразовать это в Scala сейчас без UDAF. Я посмотрел на эту реализацию , но это занимает много времени, так как число year_months растет, так как ему нужно # временных таблиц как # year_months.

Существует около 100 year_months и 70 отделов, в которых общее количество строк в этом кадре данных равно 7000. Нам нужно взять начальное значение (по первому месяцу в последовательности) для каждого отдела и умножить его на значение следующей строки. Полученный умноженный коэффициент необходимо умножить на следующую строку и т. Д.

Пример данных:

department, productivity_ratio, year_month
101,1.00,2013-01-01
101,0.98,2013-02-01
101,1.01,2013-03-01
101,0.99,2013-04-01
...
102,1.00,2013-01-01
102,1.02,2013-02-01
102,0.96,2013-03-01
...

Ожидаемый результат:

department,productivity_ratio,year_month,chained_productivity_ratio
101,1.00,2013-01-01,1.00
101,0.98,2013-02-01,0.98   (1.00*0.98)
101,1.01,2013-03-01,0.9898 (1.00*0.98*1.01)
101,0.99,2013-04-01,0.9799 (1.00*0.98*1.01*0.99)
...
102,1.00,2013-01-01,1.00   (reset to 1.00 as starting point as department name changed in sequence)
102,1.02,2013-02-01,1.02   (1.00*1.02)
102,0.96,2013-03-01,0.9792 (1.00*1.02*0.96)
...

Есть ли способ чтобы реализовать это более быстрым способом в scala, либо преобразовав это в al oop по отделам, и посмотрев на производительность_отношения как последовательность для умножения на предыдущее значение, либо изменив фрейм данных в другую структуру данных, чтобы избежать проблем с распределенной последовательностью .

Существующий код pyspark:

%pyspark
import pandas as pd
import numpy as np
import StringIO

inputParquet = "s3://path/to/parquet/files/"
inputData = spark.read.parquet(inputParquet)
inputData.printSchema

root
|-- department: string
|-- productivity_ratio: double
|-- year_month: date

inputSorted=inputData.sort('department', 'year_month')
inputSortedNotnull=inputSorted.dropna()
finalInput=inputSortedNotnull.toPandas()

prev_dept = 999
prev_productivity_ratio = 1

new_productivity_chained = []

for t in finalInput.itertuples():
    if prev_dept == t[1]:
        new_productivity_chained.append(t[2] * prev_productivity_ratio)
        prev_productivity_ratio = t[2] * prev_productivity_ratio
    else:
        prev_productivity_ratio = 1
        new_productivity_chained.append(prev_productivity_ratio)
    prev_dept = t[1]

productivityChained = finalInput.assign(chained_productivity=new_productivity_chained)

1 Ответ

1 голос
/ 04 апреля 2020

Вы можете использовать функцию window lag и выполнить exp(sum(log(<column>))) для вычисления chained_productivity_ratio, и все функции, которые мы используем: spark inbuilt functions производительность будет отличной!


Example:

In Pyspark:

df.show()
#+----------+------------------+----------+
#|department|productivity_ratio|year_month|
#+----------+------------------+----------+
#|       101|              1.00|2013-01-01|
#|       101|              0.98|2013-02-01|
#|       101|              1.01|2013-03-01|
#|       101|              0.99|2013-04-01|
#|       102|              1.00|2013-01-01|
#|       102|              1.02|2013-02-01|
#|       102|              0.96|2013-03-01|
#+----------+------------------+----------+

from pyspark.sql.functions import *
from pyspark.sql import Window

w = Window.partitionBy("department").orderBy("year_month")

df.withColumn("chained_productivity_ratio",exp(sum(log(col("productivity_ratio"))).over(w))).show()
#+----------+------------------+----------+--------------------------+
#|department|productivity_ratio|year_month|chained_productivity_ratio|
#+----------+------------------+----------+--------------------------+
#|       101|              1.00|2013-01-01|                       1.0|
#|       101|              0.98|2013-02-01|                      0.98|
#|       101|              1.01|2013-03-01|                    0.9898|
#|       101|              0.99|2013-04-01|        0.9799019999999999|
#|       102|              1.00|2013-01-01|                       1.0|
#|       102|              1.02|2013-02-01|                      1.02|
#|       102|              0.96|2013-03-01|                    0.9792|
#+----------+------------------+----------+--------------------------+

In Scala:

import org.apache.spark.sql.functions._

import org.apache.spark.sql.expressions._

val w = Window.partitionBy("department").orderBy("year_month")

df.withColumn("chained_productivity_ratio",exp(sum(log(col("productivity_ratio"))).over(w))).show()

//+----------+------------------+----------+--------------------------+
//|department|productivity_ratio|year_month|chained_productivity_ratio|
//+----------+------------------+----------+--------------------------+
//|       101|              1.00|2013-01-01|                       1.0|
//|       101|              0.98|2013-02-01|                      0.98|
//|       101|              1.01|2013-03-01|                    0.9898|
//|       101|              0.99|2013-04-01|        0.9799019999999999|
//|       102|              1.00|2013-01-01|                       1.0|
//|       102|              1.02|2013-02-01|                      1.02|
//|       102|              0.96|2013-03-01|                    0.9792|
//+----------+------------------+----------+--------------------------+
...