Я пытаюсь выполнить кластеризацию с помощью kmeans.Мой набор данных: https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014#
У меня нет большого опыта работы со свечой, я работаю всего несколько месяцев, возникает ошибка, когда я пытаюсь применить kmean.train, который имеет входные данные: vector, num_clusterи итерации.
Я работаю локально, возможно ли, что моя машина не может обрабатывать столько данных?
Основной код:
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
object Preprocesado {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("Preprocesado").getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val datos = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("input.csv")
var df= datos.select("data", "MT_001").withColumn("data", to_date($"data").cast("string")).withColumn("data", concat(lit("MT_001 "), $"data"))
val col=datos.columns
for(a<- 2 to col.size-1) {
var user = col(a)
println(user)
var df_$a = datos.select("data", col(a)).withColumn("data", to_date($"data").cast("string")).withColumn("data", concat(lit(user), lit(" "), $"data"))
df = df.unionAll(df_$a)
}
val rd=df.withColumnRenamed("MT_001", "values")
val df2 = rd.groupBy("data").agg(collect_list("values"))
val convertUDF = udf((array : Seq[Double]) => {
Vectors.dense(array.toArray)
})
val withVector = df2.withColumn("collect_list(values)", convertUDF($"collect_list(values)"))
val items : Array[Double] = new Array[Double](96)
val vecToRemove = Vectors.dense(items)
def vectors_unequal(vec1: Vector) = udf((vec2: Vector) => !vec1.equals(vec2))
val filtered = withVector.filter(vectors_unequal(vecToRemove)($"collect_list(values)"))
val Array(a, b) = filtered.randomSplit(Array(0.7,0.3))
val trainingData = a.select("collect_list(values)").rdd.map{x:Row => x.getAs[Vector](0)}
val testData = b.select("collect_list(values)").rdd.map{x:Row => x.getAs[Vector](0)}
trainingData.cache()
testData.cache()
val numClusters = 4
val numIterations = 20
val clusters = KMeans.train(trainingData, numClusters, numIterations)
clusters.predict(testData).coalesce(1,true).saveAsTextFile("output")
spark.stop()
}
}
Когда я компилирую, нетошибки.Затем я отправляю с:
spark-submit \
--class "spark.Preprocesado.Preprocesado" \
--master local[4] \
--executor-memory 7g \
--driver-memory 6g \
target/scala-2.11/preprocesado_2.11-1.0.jar
Проблема в кластеризации: Это ошибка:
18/05/20 16:45:48 ERROR Executor: Exception in task 10.0 in stage 7.0 (TID 6347)
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:212)
at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:486)
at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:589)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:563)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:557)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:557)
at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:580)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$2.apply(KMeans.scala:371)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$2.apply(KMeans.scala:370)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Как я могу решить эту ошибку?
Спасибо