проблема запуска случайного леса с конвейером на больших данных - PullRequest
0 голосов
/ 11 апреля 2019

Я новичок в Spark Scala и запускаю код на кластере EMR.Мои данные содержат как категориальные, так и числовые значения.

У меня есть код, который работает с небольшими данными, такими как 2000 записей, но при работе с 500k записями он никогда не останавливается и время ожидания исполнителей!Я понятия не имею, откуда проблема?Это заняло много времени и все равно не повезло :( Есть идеи, в чем проблема? Мой код выглядит так:

val stringIndexer_label = new StringIndexer().setInputCol("lbl").setOutputCol("label").fit(df_data)

val deviceIndexer = new StringIndexer().setInputCol("device_id").setOutputCol("device_id_index").setHandleInvalid("keep")

val playerIndexer = new StringIndexer().setInputCol("player").setOutputCol("player_index").setHandleInvalid("keep")

val bundleIndexer = new StringIndexer().setInputCol("bundle").setOutputCol("bundle_index").setHandleInvalid("keep")

val cityIndexer = new StringIndexer().setInputCol("city").setOutputCol("city_index").setHandleInvalid("keep")

val regionIndexer = new StringIndexer().setInputCol("region").setOutputCol("region_index").setHandleInvalid("keep")

val vectorAssembler_features = new VectorAssembler().
setInputCols(Array("device_id_index", "video_duration", "exchange_id", "device_type_id", "ip", "user_agent_hash")).
setOutputCol("features")

val rf = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setNumTrees(11).setMaxDepth(4).setMaxBins(699294)

val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(stringIndexer_label.labels)

val pipeline = new Pipeline().setStages(Array(stringIndexer_label, deviceIndexer, playerIndexer, bundleIndexer, cityIndexer, regionIndexer,vectorAssembler_features, rf, labelConverter))

training_data.printSchema()
training_data.select(training_data.columns.map(c => sum(col(c).isNull.cast("int")).alias(c)): _*).show
test_data.select(test_data.columns.map(c => sum(col(c).isNull.cast("int")).alias(c)): _*).show


val model_rf = pipeline.fit(training_data)```
...