Я пытаюсь применить один и тот же набор (18 или около того) преобразований к каждому столбцу (= количественная непрерывная переменная) моего кадра данных Spark (df_qc), чтобы сгенерировать новую функцию для моего упражнения по моделированию.
Исходный фрейм данных, df_qc, содержит около 700 тыс. строк и 180 столбцов.
Я запускаю цикл for для всех столбцов df_qc, в каждом раунде расширяя df_qc с помощью функции .withColumn ().Я кэширую расширенный фрейм данных после того, как все преобразования для данного столбца были применены.
Я localCheckpoint ставлю фрейм данных после каждых 10 столбцов (чтобы разбить происхождение, в противном случае я сталкиваюсь с OOM из-за огромного происхождения).
Я отслеживаю, сколько времени требуется для выполнения операций, и я заметил, что время, необходимое для выполнения набора из 18 преобразований, увеличивается, так как я добавляю все больше и больше столбцов.Это может быть связано с увеличением размера самого df, но я сомневаюсь в этом, учитывая, что этот DF все еще относительно мал.
Я использую PySpark API.Я работаю в кластере EMR с 4 узлами, рабочие узлы - машины m4.xlarge.
(1) Я пробовал несколько разных настроек:
- без localCheckpointing => resultво взрыве памяти из-за происхождения данных.
- с localCheckpointing на 10 столбцов (заняло 10 минут), 20 столбцов (заняло 16 минут), 40 столбцов (заняло 1ч24мин) ==> это «экспоненциальный» рост во времениБагс меня ...
Глядя на этапы, я вижу, что узким местом является localcheckPointing. Стадии искры
, хотя я не могу сделать это без этого ... Я хотел бы достичь состояния, в котором я могу запустить набор преобразований за примерно постоянное время для каждого столбца.
Вот воспроизводимый код: (В моем реальном случае я хотел бы запустить с nids = 100k и nqsc = 200)
from collections import defaultdict
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import numpy as np
import pandas as pd
from pyspark.sql.types import datetime, DoubleType, FloatType, IntegerType
import itertools
#create dummy input data
nmonths = 12 # number of distinct dates
nids = 1000 # number of distinct IDs
nqcs = 5 # number of columns
id = list()
for i in range (1,nids+1):
id = id + list(itertools.repeat(i, nmonths))
dummyData = {'ID':id
,'DATE':nids*[*range(1,nmonths+1)]}
for i in range(1,nqcs):
dummyData['QC_'+str(i)] = np.random.uniform(size=nmonths*nids)
pdDF = pd.DataFrame(dummyData)
df_qc = spark.createDataFrame(pdDF)
#df_qc.show()
# Add transformations
w = Window.partitionBy('ID')\
.orderBy("DATE")
counter = 1
start = datetime.datetime.now()
for col in df_qc.columns[2:]:
# 1) generate window transformations
start_in_loop = datetime.datetime.now()
df_qc = df_qc.withColumn(col+'_1', F.lag(df_qc[col], count=1).over(w))\
.withColumn(col+'_2', F.lag(df_qc[col], count=2).over(w))\
.withColumn(col+'_3', F.lag(df_qc[col], count=3).over(w))\
.withColumn(col+'_4', F.lag(df_qc[col], count=4).over(w))\
.withColumn(col+'_5', F.lag(df_qc[col], count=5).over(w))\
.withColumn(col+'_6', F.lag(df_qc[col], count=6).over(w))\
.withColumn('QC_DMAX'+col[2:], df_qc[col] - F.max(df_qc[col]).over(w))\
.withColumn('QC_DMIN'+col[2:], df_qc[col] - F.min(df_qc[col]).over(w))\
.withColumn('QC_DAVG'+col[2:], df_qc[col] - F.avg(df_qc[col]).over(w))\
.cache()
df_qc = df_qc.withColumn('QC_AD'+col[2:]+'_1', df_qc[col] - df_qc[col+'_1'])\
.withColumn('QC_AD'+col[2:]+'_3', df_qc[col] - df_qc[col+'_3'])\
.withColumn('QC_AD'+col[2:]+'_6', df_qc[col] - df_qc[col+'_6'])\
.withColumn('QC_RD'+col[2:]+'_1', (df_qc[col] - df_qc[col+'_1']) / df_qc[col+'_1'])\
.withColumn('QC_RD'+col[2:]+'_3', (df_qc[col] - df_qc[col+'_3']) / df_qc[col+'_3'])\
.withColumn('QC_RD'+col[2:]+'_6', (df_qc[col] - df_qc[col+'_6']) / df_qc[col+'_6'])\
.withColumn('QC_MA'+col[2:]+'_1', (df_qc[col] + df_qc[col+'_1']) / 2)\
.withColumn('QC_MA'+col[2:]+'_2', (df_qc[col] + df_qc[col+'_1'] + df_qc[col+'_2']) / 3)\
.withColumn('QC_MA'+col[2:]+'_3', (df_qc[col] + df_qc[col+'_1'] + df_qc[col+'_2'] + df_qc[col+'_3']) / 4)\
.cache()
# localcheckpointing (to break the lineage)
if counter%10==0:
start_checkpoint = datetime.datetime.now()
df_qc = df_qc.localCheckpoint()
print('- Time spent checkpointing :: ' + str(datetime.datetime.now() - start_checkpoint))
counter += 1
print("- Generating window tansformation | ELAPSED TIME FOR " + col + " :: " + str(datetime.datetime.now() - start_in_loop))
print("==> TOTAL ELAPSED TIME WINDOW AGGREGATIONS :: " + str(datetime.datetime.now() - start))
Любые идеи, что я делаю неправильноили как можно оптимизировать этот тип кода?
Большое спасибо!