Есть ли способ сохранить нужное количество файлов при выполнении перезаписи вставки с помощью spark? - PullRequest
0 голосов
/ 01 апреля 2020

Я постоянно использую запрос на вставку таблицы перезаписи таблицы имя_раздела (partition_column) для записи данных в мою таблицу, но проблема здесь заключается в количестве сгенерированных файлов.

, поэтому я начал использовать spark. sql .shuffle.partitions свойство для фиксирования количества файлов.

Теперь проблема в том, что в каком-то разделе меньше данных, а в некоторых разделах очень много данных. Когда это происходит, когда я выбираю свой случайный порядок воспроизведения. разделы в соответствии с моими большими данными разделов, создаются ненужные небольшие файлы, и если я выбираю случайные разделы в соответствии с разделами с низким объемом данных, задание начинает сбой из-за проблем с памятью.

Есть ли хороший способ решить эту проблему?

Ответы [ 3 ]

0 голосов
/ 02 апреля 2020

Функция, которую вы ищете, - это Size Estimator, она будет возвращать количество байтов вашего файла. Spark ужасен, когда дело доходит до файлов и количества файлов. Чтобы контролировать количество выводимых файлов, вы захотите выполнить команду перераспределения, потому что количество выходных файлов из Spark напрямую связано с количеством разделов, которые имеет объект. Для моего примера ниже я беру размер произвольного входного фрейма данных и нахожу «истинное» количество разделов (причина для + 1 заключается в том, что Spark на long и ints внутренне округляется, так что 0 разделов были бы невозможны.

Надеюсь, это поможет!

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.DataFrame 
import org.apache.spark.util.SizeEstimator 

val inputDF2 : Long = SizeEstimator.estimate(inputDF.rdd) 
//find its appropiate number of partitions 
val numPartitions : Long = (inputDF2/134217728) + 1 
//write it out with that many partitions  
val outputDF = inputDF.repartition(numPartitions.toInt) 
0 голосов
/ 02 апреля 2020

Как правило, файлы в HDFS являются неизменяемыми - их нельзя изменить напрямую. (Файл в HDFS может быть удален и может быть перезаписан новой версией файла, но в целом файл в HDFS не может быть изменен напрямую.) Таким образом, каждый раз, когда вы запускаете инструкцию INSERT, Hive или Impala создает новый файл в каталоге хранения таблицы для хранения новых значений данных, указанных в операторе. Таким образом, вставка данных небольшими партиями приводит к тому, что Hive или Impala создают множество небольших файлов в каталоге хранения таблицы. Это проблема.

0 голосов
/ 01 апреля 2020

В этом случае необходимо учитывать .repartition(), так как перераспределение приводит к разделам почти одинакового размера, что еще больше увеличивает время обработки!

  • Нужно указать число разделов к фрейму данных, основанных на числе фреймов данных..et c и примените перераспределение.

  • См. эту ссылку на динамически создать перераспределение на основе числа строки в кадре данных.

...