Как в Spark читать файлы паркета, написанные с помощью bucketBy, и сохранять данные сегментирования? - PullRequest
1 голос
/ 05 мая 2020

В Apache Spark 2.4.5, как открыть набор файлов паркета, которые были написаны с помощью bucketBy и saveAsTable?

Например:

case class VeryVeryDeeplyNestedThing(
  s: String,
  nested1: OtherVeryDeeplyNestedThing
)
case class OtherVeryDeeplyNestedThing (
  youGetTheIdeaNoOneWantsToHandWriteASqlStatementForThese: NestedMcNesty
)
List(VeryVeryDeeplyNestedThing(...)).toDS()
  .write
  .bucketBy(512, "s")
  .option("path", "/tmp/output")
  .format("parquet")
  .saveAsTable("mytable")

Теперь есть набор паркетных пилок /tmp/output. Переместите файлы из / tmp / output в / tmp / newPlace и запустите совершенно новый сеанс Spark.

spark.read.parquet("/tmp/newPlace")
  .whatGoesHere?

Что вам нужно сделать, чтобы прочитать их обратно с той же информацией сегментирования, что и они написано с помощью? Не похоже, что эта информация запекается в самих файлах паркета, или это то, что происходит?

[Изменить: добавлен рабочий пример частично из https://kb.databricks.com/_static/notebooks/data/bucketing-example.html на @thebluephantom Я думаю показывает, что для чтения действительно требуется что-то особенное]

Если вы создаете паркетные файлы следующим образом:

scala> def base = spark.range(1, 160000, 1, 16).select($"id" as "key", rand(12) as "value")
base: org.apache.spark.sql.DataFrame

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> base.write.format("parquet").bucketBy(16, "key").sortBy("value").option("path", "/tmp/example").mode(SaveMode.Overwrite).saveAsTable("bucketed")

scala> base.write.format("parquet").option("path", "/tmp/exampleunbucketed").mode(SaveMode.Overwrite).saveAsTable("unbucketed")

scala> val t2 = spark.table("bucketed")
t2: org.apache.spark.sql.DataFrame = [key: bigint, value: double]

scala> val t3 = spark.table("bucketed")
t3: org.apache.spark.sql.DataFrame = [key: bigint, value: double]

// This is joining two bucketed tables

scala> t3.join(t2, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#58]
+- *(2) BroadcastHashJoin [key#51L], [key#57L], Inner, BuildRight
   :- *(2) Project [key#51L, value#52]
   :  +- *(2) Filter isnotnull(key#51L)
   :     +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- *(1) Project [key#57L, value#58]
         +- *(1) Filter isnotnull(key#57L)
            +- *(1) FileScan parquet default.bucketed[key#57L,value#58] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16

Это имеет FileScan parquet default.bucketed с обеих сторон. Теперь просто прочитайте паркетные файлы и объясните соединение:

scala> val t4 = spark.read.parquet("/tmp/example")
t4: org.apache.spark.sql.DataFrame = [key: bigint, value: double]

scala> t3.join(t4, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#64]
+- *(2) BroadcastHashJoin [key#51L], [key#63L], Inner, BuildRight
   :- *(2) Project [key#51L, value#52]
   :  +- *(2) Filter isnotnull(key#51L)
   :     +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- *(1) Project [key#63L, value#64]
         +- *(1) Filter isnotnull(key#63L)
            +- *(1) FileScan parquet [key#63L,value#64] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>

t4 не имеет ничего, что указывает на то, что он разделен на сегменты.

Имеет ли это значение? Это все еще в коробках? Я неправильно понимаю вывод объяснения? Или мне нужно что-то сделать, чтобы убедиться, что t4 использует ведра?

Ответы [ 2 ]

2 голосов
/ 05 мая 2020

Нет. bucketBy - это настолько простой API на основе таблиц.

Используйте bucket by, чтобы впоследствии отсортировать таблицы и сделать последующие JOINs быстрее, избегая перетасовки. Используйте, таким образом, для ETL для временной, промежуточной обработки результатов в целом.

Чтение не требует ничего особенного, добавляется к запросу, но JOINed таблицы должны быть разделены на сегменты и иметь одинаковое количество сегментов и разделов. Смотрите этот отличный пост: https://kb.databricks.com/_static/notebooks/data/bucketing-example.html. Кроме того, spark sql случайные разделы должны равняться количеству сегментов.

UPDATE

В случае небольших данных может произойти широковещательное соединение ha sh, поэтому установите следующее:

spark.conf.set("spark.sql.sources.bucketing.enabled", true)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

Также используйте spark.table Я предлагаю, а не spark.read.parquet... bucketBy ony работает только с table api. См. https://engineering.taboola.com/bucket-the-shuffle-out-of-here/

0 голосов
/ 05 мая 2020

Когда мы используем сегментирование или кластеризацию при записи данных, они разделяют данные, сохраненные как несколько файлов. Например:

id,name,city
1,a,CA
2,b,NYC
3,c,NYC
4,d,CA

#So after bucketing based on city two file will be created 
id,name,city
1,a,CA
4,d,CA

and
id,name,city
2,b,NYC
3,c,NYC

Итак, когда мы читаем файл из нового места, мы сможем прочитать все данные.

Группирование помогает, когда вы хотите предикатировать выталкивание некоторых условий, поскольку оно ограничивает искру читать только определенные c файлы.

Надеюсь, что ответит.

...