У меня есть модель XGBoost, обученная Python API, и теперь я просто хочу использовать ее в спарке для прогнозирования огромных данных, которые составляют около 300 миллионов. Я просто использую loadModel API Scala XGBoost для загрузки модели в драйвер Spark, а затем передаю модель всем исполнителям для выполнения операции прогнозирования. Я использую 500 исполнителей, и 4G памяти и 1 ядро на каждого исполнителя, но это слишком медленно, что не может быть принято. Это может быть 15 часов до конца.
Но когда я тестирую на одном jvm, он очень быстро может предсказать, по крайней мере, более 200 записей в секунду, поэтому задание Spark должно быть выполнено всего за час при использовании 500 ядер, но почему это так медленно?
код как показано ниже:
sc.addFile("hdfs://xxxxxxx/model/mash/data/my_train.model")
val fpd30Model = XGBoost.loadModel(SparkFiles.get("my_train.model"))
val model30Bdc = sc.broadcast(fpd30Model)
val resultRdd = sc.textFile("hdfs://xxxxxxx/model/mash/data/30_features/").repartition(5000)
.map(row=>{
val data = row.split("\t")
val miId = data(0)
val features = data(1).split(",").map(row=>row.toFloat)
val score = model30Bdc.value.predict(new DMatrix(features, 1, 225, Float.NaN))(0)(0)
Array(miId, score).mkString("\t")
}).repartition(100)
val output = "hdfs://xxxxxxx/model/mash/score/30_score/"
HdfsIo.removePath(sc, output)
resultRdd.saveAsTextFile(output)