Шаг EMR Spark для добавления в файлы паркета перезаписывает файлы паркета - PullRequest
1 голос
/ 10 июля 2019

Spark 2.4.2 в кластере Amazon EMR (1 мастер, 2 узла) с использованием Python 3.6

Я читаю объекты в Amazon s3, сжимаю их в формате паркета и добавляю (добавляю) в существующее хранилище данных паркета. Когда я запускаю свой код в оболочке pyspark, я могу читать / сжимать объекты и добавлять новые файлы паркета в существующие файлы паркета, и когда я выполняю запрос к данным паркета, он показывает, что все данные находятся в папка для паркета. Однако, когда я выполняю код в шаге на моем кластере EMR, существующие файлы паркета перезаписываются новыми файлами. Тот же запрос покажет, что есть только новые данные, а в папке s3 с данными паркета только новые данные.

Вот код ключа шага:

    spark = SparkSession.builder \
                        .appName("myApp") \
                        .getOrCreate()

    df_p = spark.read \
                .format('parquet') \
                .load(parquet_folder)

    the_schema = df_p.schema

    df2 = spark.read \
               .format('com.databricks.spark.xml') \
               .options(rowTag='ApplicationSubmission', \
                        path=input_folder) \
               .schema(the_schema) \
               .load(input_folder+'/*.xml')

    df2.coalesce(10) \
       .write \
       .option('compression', 'snappy') \
       .option('path', parquet_folder) \
       .format('parquet') \
       .mode('append') \
       .saveAsTable(table_name, mode='append')

Я ожидал бы, что это добавит данные из input_folder к существующим данным в parquet_folder, но перезаписывается при выполнении на шаге EMR. Я пробовал без mode='append' в .saveAsTable (в оболочке pyspark это было не нужно).

Предложения

1 Ответ

0 голосов
/ 10 июля 2019

Я не знаю, почему ваш метод не работает, но у меня были лучшие результаты, если вы используете .parquet(path) вместо .saveAsTable(...). Я не знаю причину такого поведения, но я не видел saveAsTable, который раньше использовался для сохранения объектов данных, так как он создает таблицу в метастафе Hive (который не является «физическим» объектом данных).

Если ваши шаги выполняются через Apache Livy, они могут вести себя не так, как в оболочке. Если вы действительно используете Livy, вы можете протестировать свой код на блокноте Zeppelin, указав на ячейках кода, что вы должны запускать их с помощью %livy-pyspark executor.

...