Я использую кластер Spark K-means на кластерах AWS EMR. Набор данных содержит 10 ^ 7 строк и 9 столбцов объектов. Когда я загружаю данные непосредственно в Spark в виде файла .CSV из корзины S3, я получаю ожидаемые результаты, и время выполнения уменьшается по мере увеличения количества узлов в кластере. Однако, когда я получаю доступ к одним и тем же данным из Phoenix / HBase, я, по-видимому, теряю всю параллелизацию и получаю оптимальную производительность от одного узла, а также снижаю производительность по мере увеличения узлов. Код, который я использую для доступа к таблице Phoenix, выглядит следующим образом:
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.{SparkSession, SQLContext, DataFrame}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector}
sc.stop()
val configuration = new Configuration()
val sc = new SparkContext("local", "phoenix-load")
val sqlContext = new SQLContext(sc)
val df7 = sqlContext.phoenixTableAsDataFrame(
"SPARK_DATA7", Array("ROWKEY","META.DUR","META.AVG_AMP", "META.AVG_POW","META.PAPR","META.SNR","META.SNR_DB","META.BW_3DB","META.BW_10DB","META.BWNE"), conf = configuration)
val dfConvert7 = df7.selectExpr("cast(ROWKEY as integer) as ROWKEY","cast(DUR as double) as DUR", "cast(AVG_AMP as double) as AVG_AMP", "cast(AVG_POW as double) as AVG_POW", "cast(PAPR as double) as PAPR", "cast(SNR as double) as SNR", "cast(SNR_DB as double) as SNR_DB", "cast(BW_3DB as double) as BW_3DB", "cast(BW_10DB as double) as BW_10DB", "cast(BWNE as double) as BWNE")
Есть ли лучший / иной способ получения доступа к данным в HBase или Phoenix, который будет правильно распараллеливаться в Spark?