Использование разделов (с partitionBy) при написании дельта-озера не имеет никакого эффекта - PullRequest
4 голосов
/ 15 января 2020

Когда я изначально пишу дельта-озеро, используя разделы (с partitionBy) или нет, это не имеет никакого значения.

Использование перераспределения в одном и том же столбце перед записью изменяет только количество parquet-файлов , Явное указание столбца для разделения «not nullable» не меняет эффекта.

Версии:

  • Spark 2.4 (фактически 2.4.0.0-mapr-620)
  • Scala 2.11.12
  • Дельта Лейк 0.5.0 (io.delta:delta-core_2.11:jar:0.5.0)
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val tmp = spark.createDataFrame(
    spark.sparkContext.parallelize((1 to 10).map(n => Row(n, n % 3))), 
    StructType(Seq(StructField("CONTENT", IntegerType), StructField("PARTITION", IntegerType))))

/* 
tmp.show
+-------+---------+
|CONTENT|PARTITION|
+-------+---------+
|      1|        1|
|      2|        2|
|      3|        0|
|      4|        1|
|      5|        2|
|      6|        0|
|      7|        1|
|      8|        2|
|      9|        0|
|     10|        1|
+-------+---------+
tmp.printSchema
root
 |-- CONTENT: integer (nullable = true)
 |-- PARTITION: integer (nullable = true)
*/

tmp.write.format("delta").partitionBy("PARTITION").save("PARTITIONED_DELTA_LAKE")

Полученная дельта -lake каталог выглядит следующим образом:

ls -1 PARTITIONED_DELTA_LAKE
_delta_log
    00000000000000000000.json
part-00000-a3015965-b101-4f63-87de-1d06a7662312-c000.snappy.parquet
part-00007-3155dde1-9f41-49b5-908e-08ce6fc077af-c000.snappy.parquet
part-00014-047f6a28-3001-4686-9742-4e4dbac05c53-c000.snappy.parquet
part-00021-e0d7f861-79e9-41c9-afcd-dbe688720492-c000.snappy.parquet
part-00028-fe3da69d-660a-445b-a99c-0e7ad2f92bf0-c000.snappy.parquet
part-00035-d69cfb9d-d320-4d9f-9b92-5d80c88d1a77-c000.snappy.parquet
part-00043-edd049a2-c952-4f7b-8ca7-8c0319932e2d-c000.snappy.parquet
part-00050-38eb3348-9e0d-49af-9ca8-a323e58b3712-c000.snappy.parquet
part-00057-906312ad-8556-4696-84ba-248b01664688-c000.snappy.parquet
part-00064-31f5d03d-2c63-40e7-8fe5-a8374eff9894-c000.snappy.parquet
part-00071-e1afc2b9-aa5b-4e7c-b94a-0c176523e9f1-c000.snappy.parquet

cat PARTITIONED_DELTA_LAKE/_delta_log/00000000000000000000.json
{"commitInfo":{"timestamp":1579073383370,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"2cdd6fbd-bffa-415e-9c06-94ffc2048cbe","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"CONTENT\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"PARTITION\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1579073381183}}
{"add":{"path":"part-00000-a3015965-b101-4f63-87de-1d06a7662312-c000.snappy.parquet","partitionValues":{},"size":363,"modificationTime":1579073382329,"dataChange":true}}
{"add":{"path":"part-00007-3155dde1-9f41-49b5-908e-08ce6fc077af-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382545,"dataChange":true}}
{"add":{"path":"part-00014-047f6a28-3001-4686-9742-4e4dbac05c53-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382237,"dataChange":true}}
{"add":{"path":"part-00021-e0d7f861-79e9-41c9-afcd-dbe688720492-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382583,"dataChange":true}}
{"add":{"path":"part-00028-fe3da69d-660a-445b-a99c-0e7ad2f92bf0-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382893,"dataChange":true}}
{"add":{"path":"part-00035-d69cfb9d-d320-4d9f-9b92-5d80c88d1a77-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382488,"dataChange":true}}
{"add":{"path":"part-00043-edd049a2-c952-4f7b-8ca7-8c0319932e2d-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073383262,"dataChange":true}}
{"add":{"path":"part-00050-38eb3348-9e0d-49af-9ca8-a323e58b3712-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382683,"dataChange":true}}
{"add":{"path":"part-00057-906312ad-8556-4696-84ba-248b01664688-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382416,"dataChange":true}}
{"add":{"path":"part-00064-31f5d03d-2c63-40e7-8fe5-a8374eff9894-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382549,"dataChange":true}}
{"add":{"path":"part-00071-e1afc2b9-aa5b-4e7c-b94a-0c176523e9f1-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382511,"dataChange":true}}

Я бы ожидал что-то вроде

ls -1 PARTITIONED_DELTA_LAKE
_delta_log
    00000000000000000000.json
PARTITION=0
   part-00000-a3015965-b101-4f63-87de-1d06a7662312-c000.snappy.parquet
   ...

cat PARTITIONED_DELTA_LAKE/_delta_log/00000000000000000000.json
..."partitionBy":"[PARTITION]"...
..."partitionColumns":[PARTITION]...
..."partitionValues":{0}...

1 Ответ

0 голосов
/ 16 января 2020

Как прокомментировал Jacek , используемая версия Spark слишком старая. Я пробовал приведенный выше код для Spark-версий:

  • 2.4.0
  • 2.4.1
  • 2.4.2

Только с 2.4.2 разбиение работает как положено. В этом выпуске это исправление может быть причиной устранения проблемы:

.. Пользователи могут указывать столбцы в partitionBy, и наши внутренние источники данных будут использовать эту информацию. К сожалению, для внешних систем эти данные автоматически отбрасываются без обратной связи с пользователем.

...