Scala Ошибка Spark: java .lang.IllegalArgumentException: поле «col1» не существует - PullRequest
0 голосов
/ 08 мая 2020

Пытаюсь подогнать модель xgboost через спарк. Вот моя минимально воспроизводимая реализация. task-config содержит поля для анализа и параметры модели.

  private var spark: SparkSession = _
  private val taskConfig = JsonLoader.parseJsonConfig[TaskConfig]("/task-config/local_train.json")
  private val Some(groupId: String) = this.taskConfig.taskInfo.groupColumn
  private val Some(ranking: String) = this.taskConfig.taskInfo.rankingColumn
  private val numPartition = 1000

  private val spark = SparkSession
                       .builder()
                       .master("local[*]")
                       .appName("test")
                       .getOrCreate()

В производстве я использую SparkSession. Как herby, я использую csv вместо запроса SQL. Я решил оставить его как точку входа.

val data = this.spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "DROPMALFORMED")
  .load(this.taskConfig.training.db + this.taskConfig.training.table)

Кажется, что чтение работает нормально, когда я распечатываю свою схему data.schema.fields Я вижу список своих полей: Array(StructField(col1 col2 col3 ...)

Исключение выдается, когда я пытаюсь передать данные в конвейер

  private def pipeline: Pipeline = {
    val num_features = getFeaturesByType(this.taskConfig, "numeric")
    val numFeatureAssembler = new VectorAssembler()
      .setInputCols(num_features)
      .setOutputCol("num_features")

    val pipelineStages = Array(numFeatureAssembler)
    new Pipeline().setStages(pipelineStages)
  }

val pipelineModel = Some(pipeline.fit(data))

Мне не удалось обнаружить ту же проблему, пока выполняя SQL запросов, поэтому я думаю, мне следует глубже изучить правильный формат вывода считывателя данных. Тем не менее, любые советы приветствуются.

...