Не удалось выполнить задачу при записи строк.Файл уже существует - PullRequest
1 голос
/ 23 сентября 2019

Я получаю сообщение об ошибке при запуске задания Spark о том, что промежуточные файлы уже существуют.Папка или эти промежуточные файлы существуют до запуска.Я не смог найти много информации о самой ошибке в Интернете, лучшее, что я нашел, это то, что установка spark.speculation в false может помочь, однако в моем случае это не помогло.Кто-нибудь знает, что будет причиной / исправить это.Мой сценарий просто конвертирует tsvs в файлы паркета (и в процессе выполняет некоторое именование столбцов / приведение типов)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File or directory already exists at 's3://bucket/output/.emrfs_staging_0_attempt_20190923063302_0001_m_000607_9156/day=2019-09-06/a_part=__HIVE_DEFAULT_PARTITION__/part-00607-81816152-85cc-4604-b13b-9a463d4fe4a5.c000.snappy.parquet'
at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.createFile(InMemoryStagingDirectory.java:70)
    at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.SynchronizedStagingDirectory.createFile(SynchronizedStagingDirectory.java:30)
    at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingMetadataStore.createFile(InMemoryStagingMetadataStore.java:106)
    at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.StagingUploadPlanner.plan(StagingUploadPlanner.java:61)
    at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:601)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:247)
    at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
    at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:236)
    at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:260)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
    ... 10 more

Редактировать: добавлен код, похожий на этот, но больше столбцов

sc = SparkContext()
ss = SparkSession.builder\
    .getOrCreate()
sqlContext = sql.SQLContext(sc)

raw_rdd = sc.textFile('s3://bucket/path/*/*.gz')
raw_df = sqlContext.createDataFrame(raw_rdd , type.StringType())
raw_json_df = raw_df.withColumn("json", regexp_extract(raw_df.value, '([0-9\-TZ\.:]+) (\{.*)', 2))
raw_json_df = raw_json_df.drop('value')
df = sqlContext.read.json(raw_json_df.rdd.map(lambda r: r.json))
new_df = data_df.selectExpr('col1', 'col2', 'col3')

if not 'col1' in new_df.columns:
    new_df = new_df.withColumn('col1', sf.lit(None).cast(type.BooleanType()))

if not 'col2' in new_df.columns:
    new_df = new_df.withColumn('col2', sf.lit(None).cast(type.StringType()))

if not 'col3' in new_df.columns:
    new_df = new_df.withColumn('col3', sf.lit(None).cast(type.IntegerType()))

changed_df = new_df.withColumn('out_col1', new_df['col1'].cast(type.BooleanType())).drop('col1')
changed_df = changed_df.withColumn('out_col2', changed_df['col2'].cast(type.StringType())).drop('col2')
changed_df = changed_df.withColumn('out_col3', changed_df['col3'].cast(type.IntegerType())).drop('col3')

changed_df.write.mode('append').partitionBy('col2').parquet('s3://bucket/out-path/')

Редактировать 2: EMR Config (с emrfs, без emrfs-site секции

[ 
   { 
      "classification":"emrfs-site",
      "properties":{ 
         "fs.s3.consistent.retryPeriodSeconds":"60",
         "fs.s3.consistent":"true",
         "fs.s3.consistent.retryCount":"5",
         "fs.s3.consistent.metadata.tableName":"EmrFSMetadata"
      }
   },
   { 
      "configurations":[ 
         { 
            "classification":"export",
            "properties":{ 
               "PYSPARK_PYTHON":"/usr/bin/python3"
            }
         }
      ],
      "classification":"spark-env",
      "properties":{ 

      }
   },
   { 
      "classification":"spark-defaults",
      "properties":{ 
         "spark.executor.memory":"18000M",
         "spark.driver.memory":"18000M",
         "spark.yarn.scheduler.reporterThread.maxFailures":"5",
         "spark.yarn.driver.memoryOverhead":"3000M",
         "spark.executor.heartbeatInterval":"60s",
         "spark.rdd.compress":"true",
         "spark.network.timeout":"800s",
         "spark.executor.cores":"5",
         "spark.speculation":"false",
         "spark.shuffle.spill.compress":"true",
         "spark.shuffle.compress":"true",
         "spark.storage.level":"MEMORY_AND_DISK_SER",
         "spark.default.parallelism":"240",
         "spark.executor.extraJavaOptions":"-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent\u003d35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError\u003d\u0027kill -9 %p\u0027",
         "spark.executor.instances":"120",
         "spark.yarn.executor.memoryOverhead":"3000M",
         "spark.dynamicAllocation.enabled":"false",
         "spark.driver.extraJavaOptions":"-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent\u003d35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError\u003d\u0027kill -9 %p\u0027"
      }
   },
   { 
      "classification":"yarn-site",
      "properties":{ 
         "yarn.nodemanager.pmem-check-enabled":"false",
         "yarn.nodemanager.vmem-check-enabled":"false"
      }
   }
]

1 Ответ

1 голос
/ 23 сентября 2019

Если возникла та же проблема, попробуйте использовать EMRFS при создании кластера EMR (https://docs.aws.amazon.com/en_us/emr/latest/ManagementGuide/emr-fs.html).

...