Оптимизация операций по столбцам в цикле for над фреймом данных Spark в PySpark - PullRequest
0 голосов
/ 23 сентября 2019

Я пытаюсь применить один и тот же набор (18 или около того) преобразований к каждому столбцу (= количественная непрерывная переменная) моего кадра данных Spark (df_qc), чтобы сгенерировать новую функцию для моего упражнения по моделированию.

Исходный фрейм данных, df_qc, содержит около 700 тыс. строк и 180 столбцов.

Я запускаю цикл for для всех столбцов df_qc, в каждом раунде расширяя df_qc с помощью функции .withColumn ().Я кэширую расширенный фрейм данных после того, как все преобразования для данного столбца были применены.

Я localCheckpoint ставлю фрейм данных после каждых 10 столбцов (чтобы разбить происхождение, в противном случае я сталкиваюсь с OOM из-за огромного происхождения).

Я отслеживаю, сколько времени требуется для выполнения операций, и я заметил, что время, необходимое для выполнения набора из 18 преобразований, увеличивается, так как я добавляю все больше и больше столбцов.Это может быть связано с увеличением размера самого df, но я сомневаюсь в этом, учитывая, что этот DF все еще относительно мал.

Я использую PySpark API.Я работаю в кластере EMR с 4 узлами, рабочие узлы - машины m4.xlarge.

(1) Я пробовал несколько разных настроек:

  • без localCheckpointing => resultво взрыве памяти из-за происхождения данных.
  • с localCheckpointing на 10 столбцов (заняло 10 минут), 20 столбцов (заняло 16 минут), 40 столбцов (заняло 1ч24мин) ==> это «экспоненциальный» рост во времениБагс меня ...

Глядя на этапы, я вижу, что узким местом является localcheckPointing. Стадии искры

, хотя я не могу сделать это без этого ... Я хотел бы достичь состояния, в котором я могу запустить набор преобразований за примерно постоянное время для каждого столбца.

Вот воспроизводимый код: (В моем реальном случае я хотел бы запустить с nids = 100k и nqsc = 200)

from collections import defaultdict
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import numpy as np
import pandas as pd
from pyspark.sql.types import datetime, DoubleType, FloatType, IntegerType
import itertools

#create dummy input data
nmonths  = 12     # number of distinct dates
nids     = 1000  # number of distinct IDs 
nqcs     = 5    # number of columns

id = list()
for i in range (1,nids+1):
    id = id + list(itertools.repeat(i, nmonths))

dummyData = {'ID':id
             ,'DATE':nids*[*range(1,nmonths+1)]}

for i in range(1,nqcs):
    dummyData['QC_'+str(i)] = np.random.uniform(size=nmonths*nids)

pdDF = pd.DataFrame(dummyData) 
df_qc = spark.createDataFrame(pdDF)

#df_qc.show()

# Add transformations
w = Window.partitionBy('ID')\
          .orderBy("DATE")

counter = 1
start = datetime.datetime.now()
for col in df_qc.columns[2:]:

    # 1) generate window transformations
    start_in_loop = datetime.datetime.now()
    df_qc = df_qc.withColumn(col+'_1', F.lag(df_qc[col], count=1).over(w))\
                 .withColumn(col+'_2', F.lag(df_qc[col], count=2).over(w))\
                 .withColumn(col+'_3', F.lag(df_qc[col], count=3).over(w))\
                 .withColumn(col+'_4', F.lag(df_qc[col], count=4).over(w))\
                 .withColumn(col+'_5', F.lag(df_qc[col], count=5).over(w))\
                 .withColumn(col+'_6', F.lag(df_qc[col], count=6).over(w))\
                 .withColumn('QC_DMAX'+col[2:], df_qc[col] - F.max(df_qc[col]).over(w))\
                 .withColumn('QC_DMIN'+col[2:], df_qc[col] - F.min(df_qc[col]).over(w))\
                 .withColumn('QC_DAVG'+col[2:], df_qc[col] - F.avg(df_qc[col]).over(w))\
                 .cache()

    df_qc = df_qc.withColumn('QC_AD'+col[2:]+'_1', df_qc[col] - df_qc[col+'_1'])\
                 .withColumn('QC_AD'+col[2:]+'_3', df_qc[col] - df_qc[col+'_3'])\
                 .withColumn('QC_AD'+col[2:]+'_6', df_qc[col] - df_qc[col+'_6'])\
                 .withColumn('QC_RD'+col[2:]+'_1', (df_qc[col] - df_qc[col+'_1']) / df_qc[col+'_1'])\
                 .withColumn('QC_RD'+col[2:]+'_3', (df_qc[col] - df_qc[col+'_3']) / df_qc[col+'_3'])\
                 .withColumn('QC_RD'+col[2:]+'_6', (df_qc[col] - df_qc[col+'_6']) / df_qc[col+'_6'])\
                 .withColumn('QC_MA'+col[2:]+'_1', (df_qc[col] + df_qc[col+'_1']) / 2)\
                 .withColumn('QC_MA'+col[2:]+'_2', (df_qc[col] + df_qc[col+'_1'] + df_qc[col+'_2']) / 3)\
                 .withColumn('QC_MA'+col[2:]+'_3', (df_qc[col] + df_qc[col+'_1'] + df_qc[col+'_2'] + df_qc[col+'_3']) / 4)\
                 .cache()

    # localcheckpointing (to break the lineage)
    if counter%10==0:
        start_checkpoint = datetime.datetime.now()
        df_qc = df_qc.localCheckpoint()
        print('- Time spent checkpointing :: ' + str(datetime.datetime.now() - start_checkpoint))

    counter += 1

    print("- Generating window tansformation | ELAPSED TIME FOR " + col + " :: " + str(datetime.datetime.now() - start_in_loop))   
print("==> TOTAL ELAPSED TIME WINDOW AGGREGATIONS :: " + str(datetime.datetime.now() - start))

Любые идеи, что я делаю неправильноили как можно оптимизировать этот тип кода?

Большое спасибо!

...