Классификация случайных лесов Apache Spark в AWS EMR дает сбой и приводит к ошибкам: ошибка распределения, HeartbeatReceiver, YarnClusterScheduler и т. Д. - PullRequest
0 голосов
/ 03 декабря 2018

Я недавно попал в Apache Spark на AWS.У меня есть набор данных с 10 столбцами и 7 миллионами строк, но я не могу использовать весь набор, потому что Spark не может его обработать.Когда я беру более 1,5 миллиона строк, происходит сбой на одном экземпляре r4.16xlarge с 488 ГБ ОЗУ с недостаточными ошибками памяти (что я могу подтвердить, наблюдая за ним сверху, потребление памяти возрастает до 100%).Но когда я пытаюсь запустить его на всем кластере со значительно большим объемом памяти (4/488 = 1952 ГБ), это также дает сбой.Я использую следующие параметры для запуска шага EMR:

spark-submit --deploy-mode cluster --class randomforest --driver-memory 400G --num-executors 4 --executor-cores 8 --executor-memory 400g s3://spark-cluster-demo/randomforest_2.11-1.0.jar

Это скрипт scala внутри jar, который я выполняю:

import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

object randomforest {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("Spark Cluster Test")            
    val sc = new SparkContext(sparkConf)
    val spark = SparkSession.builder().appName("Spark Cluster Test").getOrCreate

    // Load and parse the data file, converting it to a DataFrame.
    val data = spark.read.format("libsvm").load("s3://spark-cluster-demo/2004_4000000.txt")

    val maxCategories = 512
    val numTrees = 64
    val featureSubsetStrategy = "auto" // supported featureSubsetStrategy settings: auto, all, onethird, sqrt, log2
    val impurity = "gini"
    val maxDepth = 30
    val maxBins = 2048
    val maxMemoryInMB = 409600

    // Index labels, adding metadata to the label column.
    // Fit on whole dataset to include all labels in index.
    val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data)
    // Automatically identify categorical features, and index them.
    // Set maxCategories so features with > X distinct values are treated as continuous.
    val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(maxCategories).fit(data)

    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

    // Train a RandomForest model.
    val rf = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setNumTrees(numTrees).setFeatureSubsetStrategy(featureSubsetStrategy).setImpurity(impurity).setMaxDepth(maxDepth).setMaxBins(maxBins).setMaxMemoryInMB(maxMemoryInMB)

    // Convert indexed labels back to original labels.
    val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)

    // Chain indexers and forest in a Pipeline.
    val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))

    // Train model. This also runs the indexers.
    val t0 = System.nanoTime()
    val model = pipeline.fit(trainingData)
    val t1 = System.nanoTime()
    println("Training time: " + (t1 - t0) + " ns")

    // Make predictions.
    val predictions = model.transform(testData)

    // Select example rows to display.
    predictions.select("predictedLabel", "label", "features").show(5)

  }

}

Весь журнал слишком велик для публикацииэто здесь.Я получаю различные ошибки и сбои, которые я перечислю здесь.Все они собираются десятки или сотни раз повсюду:

2018-11-26T09:38:35.392+0000: [GC (Allocation Failure) 2018-11-26T09:38:35.392+0000: [ParNew: 559232K->36334K(629120K), 0.0217574 secs] 559232K->36334K(2027264K), 0.0218269 secs] [Times: user=0.28 sys=0.01, real=0.02 secs]

18/11/26 10:24:37 WARN TransportChannelHandler: Exception in connection from ip-172-31-17-60.ec2.internal/172.31.17.60:45589
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
18/11/26 10:24:37 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from ip-172-31-17-60.ec2.internal/172.31.17.60:45589 is closed
18/11/26 10:24:37 ERROR OneForOneBlockFetcher: Failed while starting block fetches
18/11/26 10:24:42 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)

18/11/26 10:22:29 WARN HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 141885 ms exceeds timeout 120000 ms
18/11/26 10:22:29 ERROR YarnClusterScheduler: Lost executor 2 on ip-172-31-26-87.ec2.internal: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 16.0 in stage 47.0 (TID 1079, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 4.0 in stage 47.0 (TID 1067, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 12.0 in stage 47.0 (TID 1075, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 0.0 in stage 47.0 (TID 1063, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 20.0 in stage 47.0 (TID 1083, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 8.0 in stage 47.0 (TID 1071, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 INFO TaskSetManager: Starting task 8.1 in stage 47.0 (TID 1084, ip-172-31-26-87.ec2.internal, executor 2, partition 8, PROCESS_LOCAL, 8550 bytes)
18/11/26 10:22:29 INFO TaskSetManager: Starting task 20.1 in stage 47.0 (TID 1085, ip-172-31-26-87.ec2.internal, executor 2, partition 20, PROCESS_LOCAL, 8550 bytes)

18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_2 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_10 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_6 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_18 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_22 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_1 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_14 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_5 !
18/11/26 10:40:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(8, ip-172-31-26-192.ec2.internal, 43287, None)
18/11/26 10:40:29 INFO BlockManagerMaster: Removed 8 successfully in removeExecutor
18/11/26 10:40:29 INFO DAGScheduler: Executor 8 added was in lost list earlier.
18/11/26 10:40:29 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 47.0 failed 4 times, most recent failure: Lost task 5.3 in stage 47.0 (TID 1117, ip-172-31-26-192.ec2.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 170206 ms
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 47.0 failed 4 times, most recent failure: Lost task 5.3 in stage 47.0 (TID 1117, ip-172-31-26-192.ec2.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 170206 ms
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1803)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1791)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1790)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1790)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:871)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2024)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1973)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1962)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:743)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:742)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:742)
        at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:563)
        at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:198)
        at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:139)
        at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:45)
        at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
        at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
        at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:153)
        at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
        at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
        at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:149)
        at randomforest$.main(randomforest.scala:48)
        at randomforest.main(randomforest.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)

Что-то не так с моей конфигурацией или невозможно использовать такой большой набор данных для RF?

...