Мы разработали работу, которая обрабатывает и записывает огромное количество файлов в паркет в Amazon S3 (s3a), используя Spark 2.3. Каждый исходный файл должен создавать отдельный раздел в S3. Код был протестирован (с меньшим количеством файлов) и работал как ожидалось.
Однако после выполнения с использованием реальных данных мы заметили, что некоторые файлы (небольшое количество от общего числа) не были записаны в паркет. Нет ошибок или чего-то странного в журналах. Мы снова протестировали код для отсутствующих файлов, и он заработал worked? Мы хотим использовать код в производственной среде, но нам нужно определить, в чем здесь проблема. Пишем паркет вот так:
dataframe_with_data_to_write.repartition($"field1", $"field2").write.option("compression", "snappy").option("basePath", path_out).partitionBy("field1", "field2", "year", "month", "day").mode(SaveMode.Append).parquet(path_out)
Мы использовали рекомендуемые параметры:
spark.sparkContext.hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Есть ли известная проблема с использованием этих параметров? Может быть, что-то с возможной последовательностью S3? Есть предложения?
Любая помощь будет оценена.