Ошибка записи паркетного файла в s3 с помощью Pyspark - PullRequest
0 голосов
/ 10 июля 2020

Я пытаюсь прочитать некоторые таблицы (файлы паркета), выполнить несколько соединений и записать их в формате паркета в S3, но я получаю сообщение об ошибке или на запись таблицы у меня уходит больше двух часов.

ошибка:


    An error was encountered:
    Invalid status code '400' from https://.... with error payload: {"msg":"requirement failed: session isn't active."}

Я могу записывать другие таблицы как паркет, кроме этой таблицы.

Это мой пример кода:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.config("spark.sql.catalogImplementation", "in-memory").getOrCreate()

table1 = spark.read.parquet("s3://.../table1")
table1.createOrReplaceTempView("table1")

table2 = spark.read.parquet("s3://.../table2")
table2.createOrReplaceTempView("table2")

table3 = spark.read.parquet("s3://.../table3")
table3.createOrReplaceTempView("table3")

table4 = spark.read.parquet("s3://.../table4")
table4.createOrReplaceTempView("table4")

Final_table = spark.sql("""
select
      a.col1
      a.col2
...
      d.coln
 from

        table1 a
        left outer join
        table2 b
        on
        cond1
        cond2
        cond3
        left outer join
        table3 c
        on
...
        """)

Final_table.count()
# 3813731240

output_file="s3://.../final_table/"

final_table.write.option("partitionOverwriteMode", "dynamic").mode('overwrite').partitionBy("col1").parquet(output_file)

Просто чтобы добавить больше, я пробовал переделать, но не сработало. Кроме того, я пробовал использовать различные кластеры EMR, такие как Cluster1: Master m5.24xlarge

Cluster2: Master m5.24xlarge 1 ядро ​​m5.24xlarge

Cluster3: Master m5d.2xlarge 8 ядер m5d .2xlarge

Версия выпуска EMR 5.29.0

1 Ответ

0 голосов
/ 13 июля 2020

Большинство искровых заданий можно оптимизировать путем визуализации их DAG.

В этом сценарии, если вы можете запустить sql и получить счет за минимальное время, и все ваше время тратится только на запись, тогда вот несколько предложений

  1. Поскольку вы уже знаете счетчик вашего фрейма данных, удалите операцию подсчета, так как это ненужные накладные расходы для вашей работы.
  2. Теперь вы разделяете свои данные на col1, так что лучше попробуйте переразбить ваши данные так, чтобы наименьшее количество перемешиваний выполнялось во время записи.

Вы можете сделать что-то вроде

df.repartition('col1', 100).write

Также вы можете установить число на основе количества разделов, если вы его знаете.

...