Я получаю сообщение об ошибке при запуске задания Spark о том, что промежуточные файлы уже существуют.Папка или эти промежуточные файлы существуют до запуска.Я не смог найти много информации о самой ошибке в Интернете, лучшее, что я нашел, это то, что установка spark.speculation
в false
может помочь, однако в моем случае это не помогло.Кто-нибудь знает, что будет причиной / исправить это.Мой сценарий просто конвертирует tsvs в файлы паркета (и в процессе выполняет некоторое именование столбцов / приведение типов)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File or directory already exists at 's3://bucket/output/.emrfs_staging_0_attempt_20190923063302_0001_m_000607_9156/day=2019-09-06/a_part=__HIVE_DEFAULT_PARTITION__/part-00607-81816152-85cc-4604-b13b-9a463d4fe4a5.c000.snappy.parquet'
at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.createFile(InMemoryStagingDirectory.java:70)
at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.SynchronizedStagingDirectory.createFile(SynchronizedStagingDirectory.java:30)
at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingMetadataStore.createFile(InMemoryStagingMetadataStore.java:106)
at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.StagingUploadPlanner.plan(StagingUploadPlanner.java:61)
at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:247)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:236)
at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
... 10 more
Редактировать: добавлен код, похожий на этот, но больше столбцов
sc = SparkContext()
ss = SparkSession.builder\
.getOrCreate()
sqlContext = sql.SQLContext(sc)
raw_rdd = sc.textFile('s3://bucket/path/*/*.gz')
raw_df = sqlContext.createDataFrame(raw_rdd , type.StringType())
raw_json_df = raw_df.withColumn("json", regexp_extract(raw_df.value, '([0-9\-TZ\.:]+) (\{.*)', 2))
raw_json_df = raw_json_df.drop('value')
df = sqlContext.read.json(raw_json_df.rdd.map(lambda r: r.json))
new_df = data_df.selectExpr('col1', 'col2', 'col3')
if not 'col1' in new_df.columns:
new_df = new_df.withColumn('col1', sf.lit(None).cast(type.BooleanType()))
if not 'col2' in new_df.columns:
new_df = new_df.withColumn('col2', sf.lit(None).cast(type.StringType()))
if not 'col3' in new_df.columns:
new_df = new_df.withColumn('col3', sf.lit(None).cast(type.IntegerType()))
changed_df = new_df.withColumn('out_col1', new_df['col1'].cast(type.BooleanType())).drop('col1')
changed_df = changed_df.withColumn('out_col2', changed_df['col2'].cast(type.StringType())).drop('col2')
changed_df = changed_df.withColumn('out_col3', changed_df['col3'].cast(type.IntegerType())).drop('col3')
changed_df.write.mode('append').partitionBy('col2').parquet('s3://bucket/out-path/')
Редактировать 2: EMR Config (с emrfs, без emrfs-site
секции
[
{
"classification":"emrfs-site",
"properties":{
"fs.s3.consistent.retryPeriodSeconds":"60",
"fs.s3.consistent":"true",
"fs.s3.consistent.retryCount":"5",
"fs.s3.consistent.metadata.tableName":"EmrFSMetadata"
}
},
{
"configurations":[
{
"classification":"export",
"properties":{
"PYSPARK_PYTHON":"/usr/bin/python3"
}
}
],
"classification":"spark-env",
"properties":{
}
},
{
"classification":"spark-defaults",
"properties":{
"spark.executor.memory":"18000M",
"spark.driver.memory":"18000M",
"spark.yarn.scheduler.reporterThread.maxFailures":"5",
"spark.yarn.driver.memoryOverhead":"3000M",
"spark.executor.heartbeatInterval":"60s",
"spark.rdd.compress":"true",
"spark.network.timeout":"800s",
"spark.executor.cores":"5",
"spark.speculation":"false",
"spark.shuffle.spill.compress":"true",
"spark.shuffle.compress":"true",
"spark.storage.level":"MEMORY_AND_DISK_SER",
"spark.default.parallelism":"240",
"spark.executor.extraJavaOptions":"-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent\u003d35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError\u003d\u0027kill -9 %p\u0027",
"spark.executor.instances":"120",
"spark.yarn.executor.memoryOverhead":"3000M",
"spark.dynamicAllocation.enabled":"false",
"spark.driver.extraJavaOptions":"-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent\u003d35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError\u003d\u0027kill -9 %p\u0027"
}
},
{
"classification":"yarn-site",
"properties":{
"yarn.nodemanager.pmem-check-enabled":"false",
"yarn.nodemanager.vmem-check-enabled":"false"
}
}
]