Эффективное разбиение Spark DataFrame на две части с фильтрацией только один раз - PullRequest
0 голосов
/ 24 марта 2020

Скажем, у нас есть Dataframe dfSource, который является нетривиальным (например, результатом различных объединений и т. Д. c.) И имеет большой размер (например, 100k + строки), и у него есть столбец some_boolean, который Я хочу использовать для разделения, как это:

val dfTrue = dfSource.where(col("some_boolean") === true)
// write dfTrue, e.g. dfTrue.write.parquet("data1")
val dfFalse = dfSource.where(col("some_boolean") === false)
// write dfFalse, e.g. dfFalse.write.parquet("data2")

Теперь это приведет к сканированию и фильтрации данных дважды, верно? Есть ли способ сделать это более эффективно? Я думал о чем-то вроде

val (dfTrue, dfFalse) = dfSource.split(col("some_boolean") === true)
// write dfTrue and dfFalse

1 Ответ

0 голосов
/ 25 марта 2020

Я вижу, что вы сохраняете вывод после разделения. Вы можете использовать partitionPy при записи следующим образом:

dfSource = spark.createDataFrame([
    ['a', True],
    ['b', False],
    ['c', True],
    ['d', True],
    ['e', False],
    ['f', False]
], ["col1", "col2"]).cache()
dfSource.show()

+----+-----+
|col1| col2|
+----+-----+
|   a| true|
|   b|false|
|   c| true|
|   d| true|
|   e|false|
|   f|false|
+----+-----+


dfSource.write.partitionBy("col2").parquet("/tmp/df")

Вы увидите эти два каталога /tmp/df/col2=true и /tmp/df/col2=false

Теперь вы можете читать их как обычно

dfTrue = spark.read.parquet("/tmp/df/col2=true")
dfTrue.show()
+----+
|col1|
+----+
|   a|
|   c|
|   d|
+----+

dfFalse = spark.read.parquet("/tmp/df/col2=false")
dfFalse.show()
+----+
|col1|
+----+
|   b|
|   e|
|   f|
+----+
...