Пытаюсь подогнать модель xgboost через спарк. Вот моя минимально воспроизводимая реализация. task-config
содержит поля для анализа и параметры модели.
private var spark: SparkSession = _
private val taskConfig = JsonLoader.parseJsonConfig[TaskConfig]("/task-config/local_train.json")
private val Some(groupId: String) = this.taskConfig.taskInfo.groupColumn
private val Some(ranking: String) = this.taskConfig.taskInfo.rankingColumn
private val numPartition = 1000
private val spark = SparkSession
.builder()
.master("local[*]")
.appName("test")
.getOrCreate()
В производстве я использую SparkSession
. Как herby, я использую csv
вместо запроса SQL. Я решил оставить его как точку входа.
val data = this.spark.read
.format("csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load(this.taskConfig.training.db + this.taskConfig.training.table)
Кажется, что чтение работает нормально, когда я распечатываю свою схему data.schema.fields
Я вижу список своих полей: Array(StructField(col1 col2 col3 ...)
Исключение выдается, когда я пытаюсь передать данные в конвейер
private def pipeline: Pipeline = {
val num_features = getFeaturesByType(this.taskConfig, "numeric")
val numFeatureAssembler = new VectorAssembler()
.setInputCols(num_features)
.setOutputCol("num_features")
val pipelineStages = Array(numFeatureAssembler)
new Pipeline().setStages(pipelineStages)
}
val pipelineModel = Some(pipeline.fit(data))
Мне не удалось обнаружить ту же проблему, пока выполняя SQL запросов, поэтому я думаю, мне следует глубже изучить правильный формат вывода считывателя данных. Тем не менее, любые советы приветствуются.