У меня есть некоторый код pyspark, который выглядит следующим образом:
my_df = pd.read_csv('data.csv') # dataframe is 1,000X5
for i in range(15):
my_df = pd.concat([my_df, my_df]) # after this loop dataframe is 32,768,000X5
# This takes a very long time
spark_my_df = spark.createDataFrame(my_df)
df_columns = spark_stock_df.columns
assembler = VectorAssembler(inputCols=df_columns, outputCol="features")
spark_my_df = assembler.transform(spark_my_df)
train, test = spark_stock_df.randomSplit([0.8, 0.2], seed = 1)
dt = SparkDecisionTreeClassifier(featuresCol = 'features',
labelCol = 'Target',
maxMemoryInMB=2048,
minInstancesPerNode=2)
# Running out of memory here
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
И я пытаюсь запустить этот код в EMR на AWS с 4 узлами m5.2xlarge (1 master, 3 core) с помощью следующей команды с AWSCLI:
aws2 emr create-cluster \
--name "Spark cluster with step" \
--release-label emr-5.29.0 \
--applications Name=Spark \
--log-uri s3://logs/logs/ \
--ec2-attributes KeyName=the-key \
--instance-type m5.2xlarge \
--instance-count 4 \
--bootstrap-actions Path=s3://scripts-and-set-up/bootstrap_file.sh \
--configuration file://config.json \
--steps Name="Command Runner",Jar="command-runner.jar",Args=["spark-submit",\
"--conf","spark.executor.cores=4",\
"--conf","spark.executor.memory=16g",\
"--conf","spark.driver.memory=8g",\
"--conf","spark.rpc.message.maxSize=1500",\
"--deploy-mode=cluster",\
"s3://workflow-scripts/process_data.py"\
] \
--use-default-roles \
--auto-terminate
Однако процесс построения фрейма данных искры очень длительный, и процесс подбора дерева решений дает сбой из-за недостатка памяти. Я чувствую, что делаю что-то очень неправильное, потому что понимаю, что спарк должен быть масштабируемым, чтобы я мог решить мою проблему с 4 большими узлами m5.2
Чего мне не хватает?