Установите nThread в кластере Spark для XGBoost - PullRequest
0 голосов
/ 12 сентября 2018

API = ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

val xgbParam = Map ("eta" -> 0.1f,

  "max_depth" -> 2,

     "objective" -> "multi:softprob",

     "num_class" -> 3,

     "num_round" -> 100,

     "num_workers" -> 2)

Я выполняю задание, которое не будет работать, пока количество потоков API не будет равно значению num_worker, установленному для Spark.

Итак, в режиме master = local, когда я делаю --master local [n], а также устанавливаю num_worker для этого API в качестве того же значения, что и n, он работает.

Но в кластере я не знаю, какой параметр контролировать, который точно принимает вызов обработки количества потоков. Я пытался с -

1) spark.task.cpus
2) spark.default.parallelism
3) executor cores

Но ни одна из них не работает, и особенность этой проблемы в том, что она останавливается при распространении модели XGBoost, если вышеуказанное условие не выполняется.

Мой код выглядит следующим образом, он работает в локальном режиме, но не в кластере, любая помощь?

Код:

import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType

val schema = new StructType(Array(
  StructField("sepal length", DoubleType, true),
  StructField("sepal width", DoubleType, true),
  StructField("petal length", DoubleType, true),
  StructField("petal width", DoubleType, true),
  StructField("class", StringType, true)))
val rawInput = spark.read.schema(schema).csv("file:///appdata/bblite-data/iris.csv")
import org.apache.spark.ml.feature.StringIndexer

val stringIndexer = new StringIndexer().
  setInputCol("class").
  setOutputCol("classIndex").
  fit(rawInput)
val labelTransformed = stringIndexer.transform(rawInput).drop("class")

import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler().
  setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")).
  setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex")
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map("eta" -> 0.1f,
      "max_depth" -> 2,
      "objective" -> "multi:softprob",
      "num_class" -> 3,
      "num_round" -> 100,
      "num_workers" -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
      setFeaturesCol("features").
      setLabelCol("classIndex")
val xgbClassificationModel = xgbClassifier.fit(xgbInput)
...