Перезаписать CSV-файл на S3 не удается в Pyspark - PullRequest
0 голосов
/ 12 июня 2019

Когда я загружаю данные в фрейм данных pyspark из корзины s3, затем выполняю некоторые манипуляции (объединение, объединение), а затем я пытаюсь перезаписать тот же путь ('data / csv /') , который я читал ранее.Я получаю эту ошибку:

py4j.protocol.Py4JJavaError: An error occurred while calling o4635.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 in stage 120.0 failed 4 times, most recent failure: Lost task 200.3 in stage 120.0: java.io.FileNotFoundException: Key 'data/csv/part-00000-68ea927d-1451-4a84-acc7-b91e94d0c6a3-c000.csv' does not exist in S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
csv_a = spark \
    .read \
    .format('csv') \
    .option("header", "true") \
    .load('s3n://mybucket/data/csv') \
    .where('some condition')

csv_b = spark \
    .read \
    .format('csv') \
    .option("header", "true") \
    .load('s3n://mybucket/data/model/csv')
    .alias('csv')

# Reading glue categories data
cc = spark \
    .sql("select * from mydatabase.mytable where month='06'") \
    .alias('cc')

# Joining and Union
output = csv_b \
    .join(cc, (csv_b.key == cc.key), 'inner') \
    .select('csv.key', 'csv.created_ts', 'cc.name', 'csv.text') \
    .drop_duplicates(['key']) \
    .union(csv_a) \
    .orderBy('name') \
    .coalesce(1) \
    .write \
    .format('csv') \
    .option('header', 'true') \
    .mode('overwrite') \
    .save('s3n://mybucket/data/csv')

Мне нужно прочитать данные из местоположения s3, затем соединиться, объединиться с другими данными и, наконец, перезаписать начальный путь, чтобы сохранить только один файл CSV с чистым соединениемdata.

Если я пытаюсь прочитать (загрузить) данные из другого пути s3, отличного от того, что мне нужно перезаписать, он работает и перезаписывается нормально.

Есть идеи, почему возникает эта ошибка?

1 Ответ

0 голосов
/ 12 июня 2019

Когда вы читаете данные из папки, изменяете их и сохраняете поверх данных, которые вы изначально прочитали, spark пытается перезаписать этот же ключ на s3 (файл в hdfs) и т. Д. *

Я нашел 2 варианта:

  1. Сохранить данные во временную папку, а затем прочитать их снова
  2. Записать в память, на диск или оба с помощью df.persist ()

Решено добавлением .persist (StorageLevel.MEMORY_AND_DISK)

output = csv_b \
    .join(cc, (csv_b.key == cc.key), 'inner') \
    .select('csv.key', 'csv.created_ts', 'cc.name', 'csv.text') \
    .drop_duplicates(['key']) \
    .union(csv_a) \
    .orderBy('name') \
    .coalesce(1) \
    .persist(StorageLevel.MEMORY_AND_DISK) \
    .write \
    .format('csv') \
    .option('header', 'true') \
    .mode('overwrite') \
    .save('s3n://mybucket/data/csv')
...