Я использую Spark MLLib для выполнения K-средних кластеров в AWS EMR. Набор данных имеет порядок 10 ^ 6 строк с 9 характерными столбцами. Размер экземпляра, который я использую, имеет 8vCPU и 32 ГБ памяти.
Я ожидал, что при увеличении количества узлов в кластере я увеличу производительность (уменьшу время выполнения) со стороны Spark, однако яЯ получаю противоположные результаты.
Я получаю МНОГОВУЮ производительность (более высокое время выполнения) с БОЛЕЕ рабочими узлами / экземплярами, чем с одним рабочим узлом. У меня были те же результаты с кластерами из 5, 10 и 15 рабочих узлов;с увеличением количества узлов производительность снижается. Я попытался изменить разделы (spark.sql.shuffle.partitions) и использовал различные конфигурации ядер исполнителей, количество исполнителей и память исполнителя.
Мой код указан ниже (количество исполнителей на 10 рабочих узлов):
spark-shell --executor-cores 3 num-executors 20 --executor-memory 10G
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.{SparkSession, SQLContext, DataFrame}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector}
sc.stop()
val configuration = new Configuration()
val sc = new SparkContext("local", "phoenix-load")
val sqlContext = new SQLContext(sc)
//dataset is loaded in from Phoenix table and set as featureDf6
//dataset is made up of all numerical values (DOUBLE)
val columns = Array("DUR","AVG_AMP","AVG_POW","PAPR","SNR","SNR_DB","BW_3DB","BW_10DB","BWNE")
val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("featuresin")
val df = assembler.transform(featureDf6)
val scaler = new MinMaxScaler().setInputCol("featuresin").setOutputCol("features").setMin(-1).setMax(1)
val scalerModel = scaler.fit(df)
val scaledData = scalerModel.transform(df)
val kmeans = new KMeans().setK(14).setSeed(1L).setMaxIter(1000)
val model = kmeans.fit(scaledData)