Производительность Spark K-means уменьшается с увеличением количества узлов / экземпляров - PullRequest
0 голосов
/ 23 октября 2019

Я использую Spark MLLib для выполнения K-средних кластеров в AWS EMR. Набор данных имеет порядок 10 ^ 6 строк с 9 характерными столбцами. Размер экземпляра, который я использую, имеет 8vCPU и 32 ГБ памяти.

Я ожидал, что при увеличении количества узлов в кластере я увеличу производительность (уменьшу время выполнения) со стороны Spark, однако яЯ получаю противоположные результаты.

Я получаю МНОГОВУЮ производительность (более высокое время выполнения) с БОЛЕЕ рабочими узлами / экземплярами, чем с одним рабочим узлом. У меня были те же результаты с кластерами из 5, 10 и 15 рабочих узлов;с увеличением количества узлов производительность снижается. Я попытался изменить разделы (spark.sql.shuffle.partitions) и использовал различные конфигурации ядер исполнителей, количество исполнителей и память исполнителя.

Мой код указан ниже (количество исполнителей на 10 рабочих узлов):

spark-shell --executor-cores 3 num-executors 20 --executor-memory 10G

import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._ 
import org.apache.spark.sql.functions._
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.{SparkSession, SQLContext, DataFrame}
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector}

sc.stop()
val configuration = new Configuration()
val sc = new SparkContext("local", "phoenix-load")
val sqlContext = new SQLContext(sc) 

//dataset is loaded in from Phoenix table and set as featureDf6
//dataset is made up of all numerical values (DOUBLE)

val columns = Array("DUR","AVG_AMP","AVG_POW","PAPR","SNR","SNR_DB","BW_3DB","BW_10DB","BWNE")    
val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("featuresin")
val df = assembler.transform(featureDf6)

val scaler = new MinMaxScaler().setInputCol("featuresin").setOutputCol("features").setMin(-1).setMax(1)
val scalerModel = scaler.fit(df)
val scaledData = scalerModel.transform(df)

val kmeans = new KMeans().setK(14).setSeed(1L).setMaxIter(1000)
val model = kmeans.fit(scaledData)

1 Ответ

0 голосов
/ 23 октября 2019

Я обнаружил, что причиной проблемы был метод, в котором Spark считывал данные из Phoenix / HBase. Когда я загрузил набор данных непосредственно в Spark, результаты оказались такими, как ожидалось, и по мере увеличения количества узлов время выполнения уменьшилось. Я опубликую еще один вопрос, чтобы определить ошибку в процессе чтения из Феникса.

...