Я недавно попал в Apache Spark на AWS.У меня есть набор данных с 10 столбцами и 7 миллионами строк, но я не могу использовать весь набор, потому что Spark не может его обработать.Когда я беру более 1,5 миллиона строк, происходит сбой на одном экземпляре r4.16xlarge с 488 ГБ ОЗУ с недостаточными ошибками памяти (что я могу подтвердить, наблюдая за ним сверху, потребление памяти возрастает до 100%).Но когда я пытаюсь запустить его на всем кластере со значительно большим объемом памяти (4/488 = 1952 ГБ), это также дает сбой.Я использую следующие параметры для запуска шага EMR:
spark-submit --deploy-mode cluster --class randomforest --driver-memory 400G --num-executors 4 --executor-cores 8 --executor-memory 400g s3://spark-cluster-demo/randomforest_2.11-1.0.jar
Это скрипт scala внутри jar, который я выполняю:
import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
object randomforest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Spark Cluster Test")
val sc = new SparkContext(sparkConf)
val spark = SparkSession.builder().appName("Spark Cluster Test").getOrCreate
// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("s3://spark-cluster-demo/2004_4000000.txt")
val maxCategories = 512
val numTrees = 64
val featureSubsetStrategy = "auto" // supported featureSubsetStrategy settings: auto, all, onethird, sqrt, log2
val impurity = "gini"
val maxDepth = 30
val maxBins = 2048
val maxMemoryInMB = 409600
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > X distinct values are treated as continuous.
val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(maxCategories).fit(data)
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a RandomForest model.
val rf = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setNumTrees(numTrees).setFeatureSubsetStrategy(featureSubsetStrategy).setImpurity(impurity).setMaxDepth(maxDepth).setMaxBins(maxBins).setMaxMemoryInMB(maxMemoryInMB)
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
// Chain indexers and forest in a Pipeline.
val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
// Train model. This also runs the indexers.
val t0 = System.nanoTime()
val model = pipeline.fit(trainingData)
val t1 = System.nanoTime()
println("Training time: " + (t1 - t0) + " ns")
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
}
}
Весь журнал слишком велик для публикацииэто здесь.Я получаю различные ошибки и сбои, которые я перечислю здесь.Все они собираются десятки или сотни раз повсюду:
2018-11-26T09:38:35.392+0000: [GC (Allocation Failure) 2018-11-26T09:38:35.392+0000: [ParNew: 559232K->36334K(629120K), 0.0217574 secs] 559232K->36334K(2027264K), 0.0218269 secs] [Times: user=0.28 sys=0.01, real=0.02 secs]
18/11/26 10:24:37 WARN TransportChannelHandler: Exception in connection from ip-172-31-17-60.ec2.internal/172.31.17.60:45589
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
18/11/26 10:24:37 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from ip-172-31-17-60.ec2.internal/172.31.17.60:45589 is closed
18/11/26 10:24:37 ERROR OneForOneBlockFetcher: Failed while starting block fetches
18/11/26 10:24:42 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
18/11/26 10:22:29 WARN HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 141885 ms exceeds timeout 120000 ms
18/11/26 10:22:29 ERROR YarnClusterScheduler: Lost executor 2 on ip-172-31-26-87.ec2.internal: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 16.0 in stage 47.0 (TID 1079, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 4.0 in stage 47.0 (TID 1067, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 12.0 in stage 47.0 (TID 1075, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 0.0 in stage 47.0 (TID 1063, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 20.0 in stage 47.0 (TID 1083, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 WARN TaskSetManager: Lost task 8.0 in stage 47.0 (TID 1071, ip-172-31-26-87.ec2.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 141885 ms
18/11/26 10:22:29 INFO TaskSetManager: Starting task 8.1 in stage 47.0 (TID 1084, ip-172-31-26-87.ec2.internal, executor 2, partition 8, PROCESS_LOCAL, 8550 bytes)
18/11/26 10:22:29 INFO TaskSetManager: Starting task 20.1 in stage 47.0 (TID 1085, ip-172-31-26-87.ec2.internal, executor 2, partition 20, PROCESS_LOCAL, 8550 bytes)
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_2 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_10 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_6 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_18 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_22 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_1 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_14 !
18/11/26 10:40:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_36_5 !
18/11/26 10:40:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(8, ip-172-31-26-192.ec2.internal, 43287, None)
18/11/26 10:40:29 INFO BlockManagerMaster: Removed 8 successfully in removeExecutor
18/11/26 10:40:29 INFO DAGScheduler: Executor 8 added was in lost list earlier.
18/11/26 10:40:29 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 47.0 failed 4 times, most recent failure: Lost task 5.3 in stage 47.0 (TID 1117, ip-172-31-26-192.ec2.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 170206 ms
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 47.0 failed 4 times, most recent failure: Lost task 5.3 in stage 47.0 (TID 1117, ip-172-31-26-192.ec2.internal, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 170206 ms
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1803)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1791)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1790)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1790)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:871)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2024)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1962)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:743)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:742)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:742)
at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:563)
at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:198)
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:139)
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:45)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:153)
at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:149)
at randomforest$.main(randomforest.scala:48)
at randomforest.main(randomforest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
Что-то не так с моей конфигурацией или невозможно использовать такой большой набор данных для RF?