у нас есть большой процесс ETL, запущенный в кластере EMR, который читает и записывает большое количество файлов паркета в контейнеры S3
Вот код:
a = spark.read.parquet(path1)
a.registerTempTable('a')
b = spark.read.parquet(path2)
b.registerTempTable('b')
c = spark.read.parquet(path3)
c.registerTempTable('c')
sql = '''
select
a.col1,
a.col2,
b.col1,
b.col2,
c.col1,
c.col2,
a.dt
from
a
join
b
on
a.dt = b.dt
join
c
on
a.dt = c.dt
''''
df_out = spark.sql(sql)
df_out.repartition('dt').write.parquet( path_out, partitionBy='dt', mode='overwrite')
Недавно мы имеличтобы перейти к переходным кластерам и, следовательно, пришлось начать использовать последовательный вид.Я размещаю настройки нашего сайта ERMFS ниже:
{
"fs.s3.enableServerSideEncryption": "true",
"fs.s3.consistent": "false",
"fs.s3.consistent.retryPeriodSeconds": "10",
"fs.s3.serverSideEncryption.kms.keyId": "xxxxxx",
"fs.s3.consistent.retryCount": "5",
"fs.s3.consistent.metadata.tableName": "xxxxx",
"fs.s3.consistent.throwExceptionOnInconsistency": "true"
}
Выполнение того же кода с той же конфигурацией искры - которая работает на постоянном кластере - на переходном кластере с включенным согласованным представлением приводит к ошибке.
...
19/02/25 23:01:23 DEBUG S3NativeFileSystem: getFileStatus could not find key 'xxxxxREDACTEDxxxx'
19/02/25 23:01:23 DEBUG S3NativeFileSystem: Delete called for 'xxxxxREDACTEDxxxx' but file does not exist, so returning false
19/02/25 23:01:23 DEBUG DFSClient: DFSClient writeChunk allocating new packet seqno=465, src=/var/log/spark/apps/application_1551126537652_0003.inprogress, packetSize=65016, chunksPerPacket=126, bytesCurBlock=25074688
19/02/25 23:01:23 DEBUG DFSClient: DFSClient flush(): bytesCurBlock=25081892 lastFlushOffset=25075161 createNewBlock=false
19/02/25 23:01:23 DEBUG DFSClient: Queued packet 465
19/02/25 23:01:23 DEBUG DFSClient: Waiting for ack for: 465
19/02/25 23:01:23 DEBUG DFSClient: DataStreamer block BP-75703405-10.13.32.237-1551126523840:blk_1073741876_1052 sending packet packet seqno: 465 offsetInBlock: 25074688 lastPacketInBlock: false lastByteOffsetInBlock: 25081892
19/02/25 23:01:23 DEBUG DFSClient: DFSClient seqno: 465 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0
Traceback (most recent call last):
File "xxxxxREDACTEDxxxx", line 112, in <module>
main()
File "xxxxxREDACTEDxxxx", line xxxxxREDACTEDxxxx, in main
xxxxxREDACTEDxxxx
File "xxxxxREDACTEDxxxx", line 70, in main
partitionBy='dt', mode='overwrite')
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o232.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
Я подозреваю, что ошибка связана с настройкой EMRFS, но я не смог найти настройки EMRFS, которая бы работала.Единственное, что приводит к тому, что эта ошибка не выдается во время выполнения кода выше, - это увеличение числа узлов в два раза по сравнению с обычным числом.Ошибка также не выдается, если я уменьшу количество данных.
Изменение выходных данных и спекуляция не помогут.
Большое спасибо.Я буду очень рад любым идеям / предложениям.