Почему после чтения файла я получаю разные результаты в зависимости от того, как был создан контекст Spark? - PullRequest
0 голосов
/ 24 апреля 2020

Я пытаюсь начать работу с кластеризацией k-средних в Spark, основываясь на этом примере . Пример кода говорит:

    val conf = new SparkConf().setAppName("KMeansExample")
    val sc = new SparkContext(conf)

    // $example on$
    // Load and parse the data
    val data = sc.textFile("data/mllib/kmeans_data.txt")
    val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

Но когда я запускаю это на своем кластере, он просто печатает

20/04/22 17:46:12 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

снова и снова. В большинстве ответов на веб-сайтах говорится, что это означает, что я запрашиваю слишком много ресурсов, но это тривиальная программа (кластеризация шести точек данных), поэтому я подумал, что, может быть, я просто неправильно ее настраиваю. Я сравнил с более ранней программой Spark, которая успешно работала на моей установке, и она создала контекст по-другому, используя построитель сеансов. Поэтому я изменил код на:

    val spark = SparkSession.builder.appName("kmeans").getOrCreate()

    // Load and parse the data
    val data = spark.read.textFile("/project/sample_data.txt")
    val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

Но теперь я получаю ошибку во время компиляции:

[error] /s/chopin/a/grad/jstrout/project/goes-clustering-study/kmeans-complete/src/main/scala/kmeans.scala:18: Unable to find encoder for type org.apache.spark.mllib.linalg.Vector. An implicit Encoder[org.apache.spark.mllib.linalg.Vector] is needed to store org.apache.spark.mllib.linalg.Vector instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]     val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
[error]                              ^

Я не понимаю, почему он был скомпилирован раньше, но не в этой версии.

Есть какие-нибудь подсказки?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...