Я пытаюсь начать работу с кластеризацией k-средних в Spark, основываясь на этом примере . Пример кода говорит:
val conf = new SparkConf().setAppName("KMeansExample")
val sc = new SparkContext(conf)
// $example on$
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
Но когда я запускаю это на своем кластере, он просто печатает
20/04/22 17:46:12 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
снова и снова. В большинстве ответов на веб-сайтах говорится, что это означает, что я запрашиваю слишком много ресурсов, но это тривиальная программа (кластеризация шести точек данных), поэтому я подумал, что, может быть, я просто неправильно ее настраиваю. Я сравнил с более ранней программой Spark, которая успешно работала на моей установке, и она создала контекст по-другому, используя построитель сеансов. Поэтому я изменил код на:
val spark = SparkSession.builder.appName("kmeans").getOrCreate()
// Load and parse the data
val data = spark.read.textFile("/project/sample_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
Но теперь я получаю ошибку во время компиляции:
[error] /s/chopin/a/grad/jstrout/project/goes-clustering-study/kmeans-complete/src/main/scala/kmeans.scala:18: Unable to find encoder for type org.apache.spark.mllib.linalg.Vector. An implicit Encoder[org.apache.spark.mllib.linalg.Vector] is needed to store org.apache.spark.mllib.linalg.Vector instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
[error] val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
[error] ^
Я не понимаю, почему он был скомпилирован раньше, но не в этой версии.
Есть какие-нибудь подсказки?