All
У нас есть много файлов в ионном формате, которые мы должны сделать некоторые вычисления для данных, прежде чем сгруппировать и отфильтровать их (их общий размер составляет около 1,3 ТБ, каждый файл составляет около 200–300 МБ).
Я пытался сделать эти 2 разных способа.
Сначала нужно распараллелить список файлов из S3, который мне нужен, и преобразовать его в RDD [Row]. Затем создайте фреймы данных из него.
val rdd = ss.sparkContext.parallelize(suspendedList, suspendedList.size).flatMap(chunk => {
Ranger.consumeStreamToRow(chunk, dfSchema.value)
})
val df = ss.createDataFrame(rdd, schema)
Второй способ - распараллелить список файлов и записать его в файлы паркета в S3 с помощью AvroParquetWriter. Загрузите его обратно как dataFrame
val rdd = ss.sparkContext.parallelize(suspendedList, suspendedList.size).foreach( chunk => {
Ranger.writeParquetFile(chunk, avroSchema.value, TaskContext.getPartitionId())
})
Второй способ оказался намного быстрее первого, хотя он должен писать и читать с S3.
Одна вещь, на которую я обращаю внимание, - это то, что первый метод, по-видимому, имеет очень много времени для сбора мусора по сравнению со следующим, что, я думаю, способствует более длительному времени, необходимому для фактической обработки данных.
Может кто-нибудь объяснить, что происходит с первым и вторым методами, и почему второй метод оказался намного быстрее, хотя я читаю и пишу в S3?
(Я установил для памяти исполнителя 10 ГБ, а для драйвера - 40 ГБ, и я использую Spark EMR)