Я пытаюсь прочитать некоторые таблицы (файлы паркета), выполнить несколько соединений и записать их в формате паркета в S3, но я получаю сообщение об ошибке или на запись таблицы у меня уходит больше двух часов.
ошибка:
An error was encountered:
Invalid status code '400' from https://.... with error payload: {"msg":"requirement failed: session isn't active."}
Я могу записывать другие таблицы как паркет, кроме этой таблицы.
Это мой пример кода:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.config("spark.sql.catalogImplementation", "in-memory").getOrCreate()
table1 = spark.read.parquet("s3://.../table1")
table1.createOrReplaceTempView("table1")
table2 = spark.read.parquet("s3://.../table2")
table2.createOrReplaceTempView("table2")
table3 = spark.read.parquet("s3://.../table3")
table3.createOrReplaceTempView("table3")
table4 = spark.read.parquet("s3://.../table4")
table4.createOrReplaceTempView("table4")
Final_table = spark.sql("""
select
a.col1
a.col2
...
d.coln
from
table1 a
left outer join
table2 b
on
cond1
cond2
cond3
left outer join
table3 c
on
...
""")
Final_table.count()
# 3813731240
output_file="s3://.../final_table/"
final_table.write.option("partitionOverwriteMode", "dynamic").mode('overwrite').partitionBy("col1").parquet(output_file)
Просто чтобы добавить больше, я пробовал переделать, но не сработало. Кроме того, я пробовал использовать различные кластеры EMR, такие как Cluster1: Master m5.24xlarge
Cluster2: Master m5.24xlarge 1 ядро m5.24xlarge
Cluster3: Master m5d.2xlarge 8 ядер m5d .2xlarge
Версия выпуска EMR 5.29.0