У меня есть много фреймов данных с данными клиентов и историей для разных юридических лиц, упрощенно:
Val myData = spark.createDataframe(Seq(
(1, 1, “a lot of Data”, 2010-01-01-10.00.00”),
(1, 1, “a lot of Data”, 2010-01-20-10.31.00”),
(1, 1, “a lot of Data”, 2019-06-16-12.00.00”),
(2, 5, “a lot of Data”, 2010-01-01-10.00.00”),
(2,6, “a lot of Data”, 2010-01-01-10.00.00”),
(3, 7, “a lot of Data”, 2010-01-01-10.00.00”)))
.toDF(“legalentity”, “customernumber”,”anydata”,”changetimestamp”)
Эти фреймы данных хранятся в виде паркетных файлов и имеют внешние таблицы кустов. Временная метка изменения преобразуется в> действительна из <,> действительна в <по представлениям, например </p>
CREATE VIEW myview
AS SELECT
Legalentity, customernumber, anydata,
Changetimestamp as valid_from,
Coalesce(lead(changetimestamp) over (PARTITION by legalentity, customernumber ORDER BY changetimestamp ASC), “9999-12-31-00.00.00”) as valid_to
(это упрощено, внутри требуются некоторые преобразования временных меток)
Там позже будет много соединений между фреймами данных / таблицами улья. Эти фреймы данных хранятся следующим образом:
myDf
.orderBy(col(“legalentity”), col(“customernumber”))
.write
.format(“parquet_format”)
.mode(SaveMode.Append)
.partitionBy(“legalentity”)
.save(outputpath)
По юридическим причинам данные разных юридических лиц должны храниться в разных путях hdfs, что выполняется с помощью предложения partitionBy, которое создает отдельную папку для каждого юридического лица. Есть маленькие и большие юридические лица с огромным количеством клиентов и другие с небольшим количеством клиентов. Количество случайных разделов усредняется по всем юридическим лицам, это нормально.
Проблемы:
- Больше нет столбцов для разделения dataframe возможны: если мы хотим ускорить все, добавив повторное разделение с большим количеством столбцов в качестве предложения partitionBy для записи, например:
myDf
.orderBy(col(“legalentity”), col(“customernumber”))
.repartition(col(“legalentity”), col(“customernumber”))
.write
.format(“parquet_format”)
.mode(SaveMode.Append)
.partitionBy(“legalentity”)
.save(outputpath)
Количество разделов случайного выбора используется в каждой папке юридического лица .
Это вызывает разделы => юридическое лицо <*> количество перемешанных разделов <</p>
Слишком много разделов Есть маленькие и большие фреймы данных / таблицы. Все они получают одинаковое количество разделов в случайном порядке, поэтому небольшие фреймы данных имеют размер раздела 3 МБ или меньше. Если мы используем разное количество разделов для каждой таблицы, так что размер файла приближается к 128 МБ, все замедляется.
Мы получаем новые данные каждый день, которые мы просто добавляем, но для этого мы не используем количество перемешиваемых разделов, мы переразбиваем (1). Иногда нам приходится перезагружать все, чтобы сжать все эти разделы, но наши процессы не замедляются новыми ежедневными данными.