В Apache Spark 2.4.5, как открыть набор файлов паркета, которые были написаны с помощью bucketBy и saveAsTable?
Например:
case class VeryVeryDeeplyNestedThing(
s: String,
nested1: OtherVeryDeeplyNestedThing
)
case class OtherVeryDeeplyNestedThing (
youGetTheIdeaNoOneWantsToHandWriteASqlStatementForThese: NestedMcNesty
)
List(VeryVeryDeeplyNestedThing(...)).toDS()
.write
.bucketBy(512, "s")
.option("path", "/tmp/output")
.format("parquet")
.saveAsTable("mytable")
Теперь есть набор паркетных пилок /tmp/output
. Переместите файлы из / tmp / output в / tmp / newPlace и запустите совершенно новый сеанс Spark.
spark.read.parquet("/tmp/newPlace")
.whatGoesHere?
Что вам нужно сделать, чтобы прочитать их обратно с той же информацией сегментирования, что и они написано с помощью? Не похоже, что эта информация запекается в самих файлах паркета, или это то, что происходит?
[Изменить: добавлен рабочий пример частично из https://kb.databricks.com/_static/notebooks/data/bucketing-example.html на @thebluephantom Я думаю показывает, что для чтения действительно требуется что-то особенное]
Если вы создаете паркетные файлы следующим образом:
scala> def base = spark.range(1, 160000, 1, 16).select($"id" as "key", rand(12) as "value")
base: org.apache.spark.sql.DataFrame
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> base.write.format("parquet").bucketBy(16, "key").sortBy("value").option("path", "/tmp/example").mode(SaveMode.Overwrite).saveAsTable("bucketed")
scala> base.write.format("parquet").option("path", "/tmp/exampleunbucketed").mode(SaveMode.Overwrite).saveAsTable("unbucketed")
scala> val t2 = spark.table("bucketed")
t2: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
scala> val t3 = spark.table("bucketed")
t3: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
// This is joining two bucketed tables
scala> t3.join(t2, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#58]
+- *(2) BroadcastHashJoin [key#51L], [key#57L], Inner, BuildRight
:- *(2) Project [key#51L, value#52]
: +- *(2) Filter isnotnull(key#51L)
: +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [key#57L, value#58]
+- *(1) Filter isnotnull(key#57L)
+- *(1) FileScan parquet default.bucketed[key#57L,value#58] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
Это имеет FileScan parquet default.bucketed
с обеих сторон. Теперь просто прочитайте паркетные файлы и объясните соединение:
scala> val t4 = spark.read.parquet("/tmp/example")
t4: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
scala> t3.join(t4, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#64]
+- *(2) BroadcastHashJoin [key#51L], [key#63L], Inner, BuildRight
:- *(2) Project [key#51L, value#52]
: +- *(2) Filter isnotnull(key#51L)
: +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [key#63L, value#64]
+- *(1) Filter isnotnull(key#63L)
+- *(1) FileScan parquet [key#63L,value#64] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
t4 не имеет ничего, что указывает на то, что он разделен на сегменты.
Имеет ли это значение? Это все еще в коробках? Я неправильно понимаю вывод объяснения? Или мне нужно что-то сделать, чтобы убедиться, что t4 использует ведра?