У меня есть искровое задание, которое я полностью сократил до:
spark.read.option("delimiter", delimiter)
.schema(Encoders.product[MyData].schema)
.csv("s3://bucket/data/*/*.gz")
.as[MyData]
, чтобы изолировать ошибку, и оно все еще дает мне java.lang.OutOfMemoryError
при работе в AWS EMR на YARN.Общий размер файла составляет приблизительно 4,7 ГБ в сжатом виде (размер каждого раздела составляет от 1 до 20 КБ);общее количество строк = 373 063 082.
Схема MyData (запутанная):
case class MyData(field1: Long, field2: String, field3: Int, field4: Float, field5: Float, field6: Option[Int] = None, field7: Option[Int])
Странно то, что задание полностью работает во всех следующих случаях:
- В гораздо большем наборе данных (70 ГБ в сжатом виде);каждый отдельный файл раздела имеет примерно тот же размер, что и файлы раздела в меньшем наборе данных.
- Для каждой половины файлов по отдельности;то есть я выполнил одно задание на
s3://bucket/data/2017*/*.gz
, а другое на s3://bucket/data/2018*/*.gz
, и оба успешно. - На моей локальной машине с помощью
master("local[*]")
.Единственное отличие состоит в том, что в кластере используется YARN (протестировано с MASTER: 1 x m4.2xlarge, CORE: 25 x m4.2xlarge, TASK: 25 x m4.2xlarge
и с более мелкими конфигурациями): все они потерпели неудачу.
В журналах stderr я получаю:
[Stage 0:===============================================> (9536 + 411) / 10000]
[Stage 0:=================================================>(9825 + 175) / 10000]
[Stage 0:==================================================>(9964 + 36) / 10000]
[Stage 0:===================================================>(9992 + 8) / 10000]
[Stage 0:===================================================>(9997 + 3) / 10000]
[Stage 0:===================================================>(9998 + 2) / 10000]
[Stage 0:===================================================>(9999 + 1) / 10000]
Command exiting with ret '137'
А затем Spark-UI зависает около 9999 / 10000.
Я также набрал s3://bucket/data/201[7-8]*/*.gz
, чтобы посмотреть, не является ли проблема регулярного выражения захватом большего количества файлов, чем предполагалось.В конечном итоге он выдал те же ошибки.
Наконец, я также проверил Ganglia, чтобы попытаться выяснить, что происходит, и не увидел ничего, что бросилось бы в глаза.
Мой кластерКоманда развертывания (информация помечена звездочкой):
aws emr create-cluster --name $NAME --release-label emr-5.12.0 \
--log-uri s3://bucket/logs/ \
--instance-fleets InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{InstanceType=m4.xlarge}'] \
InstanceFleetType=CORE,TargetSpotCapacity=25,InstanceTypeConfigs=['{InstanceType=m4.xlarge,BidPrice=0.2,WeightedCapacity=1}'],LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=120,TimeoutAction=SWITCH_TO_ON_DEMAND}'} \
InstanceFleetType=TASK,TargetSpotCapacity=25,InstanceTypeConfigs=['{InstanceType=m4.xlarge,BidPrice=0.2,WeightedCapacity=1}'],LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=120,TimeoutAction=SWITCH_TO_ON_DEMAND}'} \
--ec2-attributes KeyName="*****",SubnetId=subnet-d******* --use-default-roles \
--applications Name=Spark Name=Ganglia \
--steps Type=CUSTOM_JAR,Name=CopyAppFromS3,ActionOnFailure=CONTINUE,Jar="command-runner.jar",Args=[aws,s3,cp,s3://bucket/assembly-0.1.0.jar,/home/hadoop] \
Type=Spark,Name=MyApp,ActionOnFailure=CONTINUE,Args=[/home/hadoop/assembly-0.1.0.jar] --configurations file://$CONFIG_FILE --auto-terminate
Я хотел бы понять, почему spark не может прочитать меньший набор данных, когда он может прочитать тот, который в 15 раз больше (та же конфигурация кластера), и почему он работаетна моей локальной машине, но не на AWS, и, наконец, почему он работает на обеих половинах по отдельности, но не вместе.Какие данные могут вызвать это?Что я могу сделать, чтобы решить эту проблему или избежать ее в будущем?
РЕДАКТИРОВАТЬ: Локальный компьютер представляет собой MacBook Pro Retina 15-дюймовый 2015 с 2,8 ГГц Intel Core i7, 16 ГБ оперативной памяти и 1 ТБ SSD.
EDIT2: я тоже получил это в stderr один раз:
18/05/28 16:15:49 ERROR SparkContext: Exception getting thread dump from executor 1
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.SparkContext.getExecutorThreadDump(SparkContext.scala:607)
at org.apache.spark.ui.exec.ExecutorThreadDumpPage.render(ExecutorThreadDumpPage.scala:40)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689)
at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:171)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:524)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)