Spark Clustered By / Bucket по набору данных, не использующему память - PullRequest
0 голосов
/ 20 ноября 2018

Я недавно сталкивался с Spark bucketby / clusterteredby здесь .

Я пытался имитировать это для исходного файла объемом 1,1 ТБ из S3 (уже в паркете).План состоит в том, чтобы полностью избежать перемешивания, поскольку большинство наборов данных всегда объединяются в столбце «id».Вот что я делаю:

myDf.repartition(20)
    .write.partitionBy("day")
    .option("mode", "DROPMALFORMED")
    .option("compression", "snappy")
    .option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
.format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")

В другом кластере EMR я создаю таблицу и получаю к ней доступ.

CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2  double) USING PARQUET OPTIONS (
path "s3://my-bucket/folder/1year_data_bucketed/"
)
CLUSTERED BY (id) INTO 20 BUCKETS

Создайте фрейм данных scala и соедините его с другой таблицей.из тех же 20 полей столбца идентификатора.

val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
joinedOutput.show()

Вот мои вопросы:

  1. Я вижу, что даже при перераспределении shuffle все еще удаляется в плане объяснения, что хорошо,Есть ли какие-либо проблемы с использованием перераспределения, раздела, bucketBy описанным выше способом?
  2. Приведенное выше соединение не похоже на использование памяти в моем кластере EMR от Ganglia.При объединении файлов Regular в формате паркета без группирования они, кажется, работают быстрее в памяти при меньшем количестве дневных разделов.Я не проверял это больше дней.Как именно здесь обрабатывается объединение?Есть ли способ избежать оператора CREATE TABLE sql и вместо этого использовать метаданные паркета для определения схемы таблицы с использованием scala?Я не хочу повторять имена столбцов, типы данных, когда они фактически доступны в паркете.
  3. Каково идеальное количество сегментов или размер отдельного файла после сегмента с точки зрения доступной памяти на исполнителе?Если уникальное количество значений в столбце идентификатора находится в диапазоне ~ 100 ММ, то, если я правильно понимаю, 20 сегментов разделят каждый сегмент как уникальные идентификаторы 5 мм.Я понимаю, что сортировка здесь не соблюдается из-за того, что Spark для BucketBy создает несколько файлов.Какова рекомендация для перераспределения / размера конечного файла / количества сегментов в этом случае.
...