Я недавно сталкивался с Spark bucketby / clusterteredby здесь .
Я пытался имитировать это для исходного файла объемом 1,1 ТБ из S3 (уже в паркете).План состоит в том, чтобы полностью избежать перемешивания, поскольку большинство наборов данных всегда объединяются в столбце «id».Вот что я делаю:
myDf.repartition(20)
.write.partitionBy("day")
.option("mode", "DROPMALFORMED")
.option("compression", "snappy")
.option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
.format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")
В другом кластере EMR я создаю таблицу и получаю к ней доступ.
CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2 double) USING PARQUET OPTIONS (
path "s3://my-bucket/folder/1year_data_bucketed/"
)
CLUSTERED BY (id) INTO 20 BUCKETS
Создайте фрейм данных scala и соедините его с другой таблицей.из тех же 20 полей столбца идентификатора.
val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
joinedOutput.show()
Вот мои вопросы:
- Я вижу, что даже при перераспределении shuffle все еще удаляется в плане объяснения, что хорошо,Есть ли какие-либо проблемы с использованием перераспределения, раздела, bucketBy описанным выше способом?
- Приведенное выше соединение не похоже на использование памяти в моем кластере EMR от Ganglia.При объединении файлов Regular в формате паркета без группирования они, кажется, работают быстрее в памяти при меньшем количестве дневных разделов.Я не проверял это больше дней.Как именно здесь обрабатывается объединение?Есть ли способ избежать оператора CREATE TABLE sql и вместо этого использовать метаданные паркета для определения схемы таблицы с использованием scala?Я не хочу повторять имена столбцов, типы данных, когда они фактически доступны в паркете.
- Каково идеальное количество сегментов или размер отдельного файла после сегмента с точки зрения доступной памяти на исполнителе?Если уникальное количество значений в столбце идентификатора находится в диапазоне ~ 100 ММ, то, если я правильно понимаю, 20 сегментов разделят каждый сегмент как уникальные идентификаторы 5 мм.Я понимаю, что сортировка здесь не соблюдается из-за того, что Spark для BucketBy создает несколько файлов.Какова рекомендация для перераспределения / размера конечного файла / количества сегментов в этом случае.