Как распределить код и набор данных на рабочие узлы? - PullRequest
1 голос
/ 29 июня 2019

Я работал с набором данных Movielens (20 миллионов записей) и использовал совместную фильтрацию в Spark MLlib.

steps of algorithm

Моя среда - Ubuntu 14.4 на VirtualBox.У меня есть один главный узел и 2 подчиненных узла.Я использовал выпущенные Apache Hadoop, Apache Spark, Scala, sbt.Код написан на Scala.

Как распределить код и набор данных по рабочим узлам?

import java.lang.Math._

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object trainModel extends App {

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("trainModel")
  val sc = new SparkContext(conf)

  val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")

  val sqlContext = new SQLContext(sc)
  val df = sqlContext
    .read
    .option("header", "true")
    .format("csv")
    .load("file:///usr/local/spark/dataset/rating.csv")

  val ratings = rawData.map(line => line.split(",").take(3) match {
    case Array(userId, movieId, rating) => 
      Rating(userId.toInt, movieId.toInt, rating.toFloat)
  })
  println(s"Number of Ratings in Movie file ${ratings.count()} \n")

  val ratingsRDD = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
  //split data into test&train
  val splits = ratingsRDD.randomSplit(Array(0.8, 0.2), seed = 12345)
  val trainingRatingsRDD = splits(0).cache()
  val testRatingsRDD = splits(1).cache()
  val numTraining = trainingRatingsRDD.count()
  val numTest = testRatingsRDD.count()
  println(s"Training: $numTraining, test: $numTest.")

  val rank = 10
  val lambdas = 0.01
  val numIterations = 10
  val model = ALS.train(ratings, rank, numIterations)
  //Evaluate the model on training data
  val userProducts = ratings.map { case Rating(userId, movieId, rating) =>
    (userId, movieId)
  }
  val predictions = model.predict(userProducts).map { case
    Rating(userId, movieId, rating) =>
    ((userId, movieId), rating)
  }
  val ratesAndPreds = ratings.map { case Rating(userId, movieId, rating) =>
    ((userId, movieId),
      rating)
  }.join(predictions)
  val meanSquaredError = ratesAndPreds.map { case ((userId, movieId),
  (r1, r2)) =>
    val err = r1 - r2
    err * err
  }.mean
  println("Mean Squared Error= " + meanSquaredError)
  sqrt(meanSquaredError)
  val rmse = math.sqrt(meanSquaredError)
  println(s" RMSE = $rmse.")
}

Ответы [ 2 ]

1 голос
/ 04 июля 2019

Как распространять код

Это происходит, когда вы spark-submit используете приложение Spark. Распределение может быть по ядру / потоку процессора или исполнителям. Вам не нужно кодировать это. Вот почему люди используют Spark, как это должно произойти (почти) автоматически.

conf.setMaster ( "местный [*]")

Это говорит о том, что вы используете одного исполнителя с таким количеством потоков, сколько у вас ядер ЦП. Это локальный дистрибутив.

Вам лучше удалить строку из кода и использовать вместо нее spark-submit --master. Прочитайте официальную документацию, особенно Подача заявок .

... и набор данных в рабочие узлы? val rawData = sc.textFile ("file: ///usr/local/spark/dataset/rating.csv")

В строке указано, как распределяется набор данных Movielens (rating.csv). Это не имеет никакого отношения к Spark, поскольку Spark использует любой дистрибутив в файловой системе.

Другими словами, в Hadoop HDFS с размером блока 256 МБ ( split ) файл, вдвое превышающий размер блока, доступен в 2 частях. Это HDFS, чтобы сделать файл распределенным и отказоустойчивым.

Когда Spark считывает файл с двумя разделениями, распределенные вычисления (описанные с использованием RDD) будут использовать 2 раздела и, таким образом, 2 задачи.

HDFS - это файловая система / хранилище, поэтому выберите любое место и hdfs -put набор данных. Думайте о HDFS как о любой файловой системе, к которой у вас есть удаленный доступ. Используйте местоположение в качестве входного параметра sc.textFile, и все готово.

1 голос
/ 29 июня 2019

1 - Ваш набор данных лучше всего поместить в распределенную файловую систему - Hadoop HDFS, S3 и т. Д.

2 - Код распространяется с помощью скрипта spark-submit, как описано здесь https://spark.apache.org/docs/2.4.3/submitting-applications.html

...