Как управлять физическим размещением данных в рамках кластера с помощью pyspark? - PullRequest
2 голосов
/ 04 февраля 2020

Скажем, у меня есть фрейм данных pyspark 'data' следующим образом. Я хочу разделить данные по «Период». Скорее, я хочу, чтобы каждый период данных был сохранен в своем собственном разделе (см. Пример под фреймом данных ниже).

data = sc.parallelize([[1,1,0,14277.4,0], \
[1,2,0,14277.4,0], \
[2,1,0,4741.91,0], \
[2,2,0,4693.03,0], \
[3,1,2,9565.93,0], \
[3,2,2,9566.05,0], \
[4,2,0,462.68,0], \
[5,1,1,3549.66,0], \
[5,2,5,3549.66,1], \
[6,1,1,401.52,0], \
[6,2,0,401.52,0], \
[7,1,0,1886.24,0], \
[7,2,0,1886.24,0]]) \
.toDF(("Acct","Period","Status","Bal","CloseFlag"))

data.show(100)

+----+------+------+-------+---------+
|Acct|Period|Status|    Bal|CloseFlag|
+----+------+------+-------+---------+
|   1|     1|     0|14277.4|        0|
|   1|     2|     0|14277.4|        0|
|   2|     1|     0|4741.91|        0|
|   2|     2|     0|4693.03|        0|
|   3|     1|     2|9565.93|        0|
|   3|     2|     2|9566.05|        0|
|   4|     2|     0| 462.68|        0|
|   5|     1|     1|3549.66|        0|
|   5|     2|     5|3549.66|        1|
|   6|     1|     1| 401.52|        0|
|   6|     2|     0| 401.52|        0|
+----+------+------+-------+---------+

Например

Раздел 1:

+----+------+------+-------+---------+
|Acct|Period|Status|    Bal|CloseFlag|
+----+------+------+-------+---------+
|   1|     1|     0|14277.4|        0|
|   2|     1|     0|4741.91|        0|
|   3|     1|     2|9565.93|        0|
|   5|     1|     1|3549.66|        0|
|   6|     1|     1| 401.52|        0|
+----+------+------+-------+---------+

Раздел 2:

+----+------+------+-------+---------+
|Acct|Period|Status|    Bal|CloseFlag|
+----+------+------+-------+---------+
|   1|     2|     0|14277.4|        0|
|   2|     2|     0|4693.03|        0|
|   3|     2|     2|9566.05|        0|
|   4|     2|     0| 462.68|        0|
|   5|     2|     5|3549.66|        1|
|   6|     2|     0| 401.52|        0|
+----+------+------+-------+---------+

1 Ответ

4 голосов
/ 04 февраля 2020

Подход должен состоять в том, чтобы сначала перераспределить, чтобы получить правильное количество разделов (количество уникальных периодов), а затем разделить по столбцу Период перед сохранением.

from pyspark.sql import functions as F
n = data.select(F.col('Period')).distinct().count()

data.repartition(n)\
     .write \
     .partitionBy("Period")\
     .mode("overwrite")\
     .format("parquet")\
     .saveAsTable("testing")
...