Spark 2.4.2 в кластере Amazon EMR (1 мастер, 2 узла) с использованием Python 3.6
Я читаю объекты в Amazon s3, сжимаю их в формате паркета и добавляю (добавляю) в существующее хранилище данных паркета. Когда я запускаю свой код в оболочке pyspark, я могу читать / сжимать объекты и добавлять новые файлы паркета в существующие файлы паркета, и когда я выполняю запрос к данным паркета, он показывает, что все данные находятся в папка для паркета. Однако, когда я выполняю код в шаге на моем кластере EMR, существующие файлы паркета перезаписываются новыми файлами. Тот же запрос покажет, что есть только новые данные, а в папке s3 с данными паркета только новые данные.
Вот код ключа шага:
spark = SparkSession.builder \
.appName("myApp") \
.getOrCreate()
df_p = spark.read \
.format('parquet') \
.load(parquet_folder)
the_schema = df_p.schema
df2 = spark.read \
.format('com.databricks.spark.xml') \
.options(rowTag='ApplicationSubmission', \
path=input_folder) \
.schema(the_schema) \
.load(input_folder+'/*.xml')
df2.coalesce(10) \
.write \
.option('compression', 'snappy') \
.option('path', parquet_folder) \
.format('parquet') \
.mode('append') \
.saveAsTable(table_name, mode='append')
Я ожидал бы, что это добавит данные из input_folder
к существующим данным в parquet_folder
, но перезаписывается при выполнении на шаге EMR. Я пробовал без mode='append'
в .saveAsTable
(в оболочке pyspark это было не нужно).
Предложения