Spark: java.lang.IllegalArgumentException: требование не выполнено, kmeans (mllib) - PullRequest
0 голосов
/ 20 мая 2018

Я пытаюсь выполнить кластеризацию с помощью kmeans.Мой набор данных: https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014#

У меня нет большого опыта работы со свечой, я работаю всего несколько месяцев, возникает ошибка, когда я пытаюсь применить kmean.train, который имеет входные данные: vector, num_clusterи итерации.

Я работаю локально, возможно ли, что моя машина не может обрабатывать столько данных?

Основной код:

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}

object Preprocesado {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("Preprocesado").getOrCreate()
import spark.implicits._ 
    val sc = spark.sparkContext 

    val datos = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("input.csv")
var df= datos.select("data", "MT_001").withColumn("data", to_date($"data").cast("string")).withColumn("data", concat(lit("MT_001 "), $"data"))
val col=datos.columns

for(a<- 2 to col.size-1) {
      var user = col(a)
      println(user)
      var df_$a = datos.select("data", col(a)).withColumn("data", to_date($"data").cast("string")).withColumn("data", concat(lit(user), lit(" "),  $"data"))
       df = df.unionAll(df_$a)
      }

val rd=df.withColumnRenamed("MT_001", "values")
val df2 = rd.groupBy("data").agg(collect_list("values"))

val convertUDF = udf((array : Seq[Double]) => {
  Vectors.dense(array.toArray)
})
val withVector = df2.withColumn("collect_list(values)", convertUDF($"collect_list(values)"))
val items : Array[Double] = new Array[Double](96)
val vecToRemove = Vectors.dense(items)
def vectors_unequal(vec1: Vector) = udf((vec2: Vector) => !vec1.equals(vec2))
val filtered = withVector.filter(vectors_unequal(vecToRemove)($"collect_list(values)"))
val Array(a, b) = filtered.randomSplit(Array(0.7,0.3))
val trainingData = a.select("collect_list(values)").rdd.map{x:Row => x.getAs[Vector](0)}
val testData = b.select("collect_list(values)").rdd.map{x:Row => x.getAs[Vector](0)}
trainingData.cache()
testData.cache()
val numClusters = 4
val numIterations = 20
val clusters = KMeans.train(trainingData, numClusters, numIterations)
clusters.predict(testData).coalesce(1,true).saveAsTextFile("output")


spark.stop()
  }
}

Когда я компилирую, нетошибки.Затем я отправляю с:

spark-submit \
  --class "spark.Preprocesado.Preprocesado" \
  --master local[4] \
  --executor-memory 7g \
  --driver-memory 6g \
  target/scala-2.11/preprocesado_2.11-1.0.jar

Проблема в кластеризации: Это ошибка:

18/05/20 16:45:48 ERROR Executor: Exception in task 10.0 in stage 7.0 (TID 6347)
java.lang.IllegalArgumentException: requirement failed
    at scala.Predef$.require(Predef.scala:212)
    at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:486)
    at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:589)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:563)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:557)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:557)
    at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:580)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$2.apply(KMeans.scala:371)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$2.apply(KMeans.scala:370)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Как я могу решить эту ошибку?

Спасибо

1 Ответ

0 голосов
/ 21 мая 2018

Я думаю, что вы генерируете свой DataFrame df и, следовательно, df2 неправильно.Возможно, вы пытаетесь сделать это:

case class Data(values: Double, data: String)
var df = spark.emptyDataset[Data]

df = datos.columns.filter(_.startsWith("MT")).foldLeft(df)((df, c) => {
                val values = col(c).cast("double").as("values")
                val data = concat(lit(c), lit(" "), to_date($"_c0").cast("string")).as("data")

                df.union(datos.select(values, data).as[Data]) 
    })


val df2 = df.groupBy("data").agg(collect_list("values"))

Как мне кажется, вам нужны только два столбца: data и значения , но в цикле for высоздание DataFrame с 140256 столбцами (по одному для каждого атрибута), и, возможно, это источник ваших проблем.

pd: извините за мой английский !.

...