Как применить перераспределение используя окно в pyspark? - PullRequest
0 голосов
/ 30 марта 2020

У меня есть столбец Foo, который содержит double значение, такое как:

[ 100.4, 39.6, 98.2, 10.8, 62.1, 69.6 … ]

Я хотел бы перераспределить, используя окно 10, которое сгенерирует набор данных, например:

Foo=10
Foo=20
Foo=30
Foo=40
Foo=50
Foo=60
Foo=70
Foo=80
Foo=90
Foo=100
Foo=110

Использование repartiton(number: int, colname: str) разделяет фрейм данных на заданные number файлов. Но я не могу выбрать окно.

Так как это сделать в pyspark?

спасибо

Ответы [ 2 ]

1 голос
/ 31 марта 2020

Добавление к ответу Даниэля .

+-----+----------+
|  Foo|Foo_binned|
+-----+----------+
|100.4|       100|
| 39.6|        30|
| 98.2|        90|
| 10.8|        10|
| 62.1|        60|
| 69.6|        60|
+-----+----------+

Это гарантирует, что для каждого диапазона foo вы получите 1 файл.

from pyspark.sql import functions as F
n = df.select(F.col('Foo_binned')).distinct().count()

data.repartition(n)\
     .write \
     .partitionBy("Foo_binned")\
     .csv(path)
1 голос
/ 31 марта 2020

Я не уверен, что вы подразумеваете под перераспределением, но в любом случае, если у вас есть df из:

+-----+
|  Foo|
+-----+
|100.4|
| 39.6|
| 98.2|
| 10.8|
| 62.1|
| 69.6|
+-----+

Вы можете легко округлить значения:

from pyspark.sql.functions import col, floor
df2 = df.withColumn('Foo_binned', floor(col('Foo') / 10) * 10)
+-----+----------+
|  Foo|Foo_binned|
+-----+----------+
|100.4|       100|
| 39.6|        30|
| 98.2|        90|
| 10.8|        10|
| 62.1|        60|
| 69.6|        60|
+-----+----------+

Если это результат, который вы ищете, вы можете выбрать / переименовать только новый столбец. Вы также можете просто изменить метод округления в зависимости от ваших требований (floor, round, ceil).

Если при перераспределении вы действительно хотите физически сохранить значения в разных папках на основе Если разбить на 10, вы можете запустить:

df2.write.partitionBy('Foo_binned').csv('./foos.csv')

, который будет разбивать данные при сохранении:

30.03.2020  23:05                 8 ._SUCCESS.crc
30.03.2020  23:05    <DIR>          Foo_binned=10
30.03.2020  23:05    <DIR>          Foo_binned=100
30.03.2020  23:05    <DIR>          Foo_binned=30
30.03.2020  23:05    <DIR>          Foo_binned=60
30.03.2020  23:05    <DIR>          Foo_binned=90
30.03.2020  23:05                 0 _SUCCESS

И последнее, но не менее важное, если вы просто хотите, чтобы данные в памяти были разделены этих ведер довольно сложно достичь, потому что, ну, вы не должны этого делать. Spark включает в себя механизм оптимизации, который будет работать лучше, если вы просто дадите ему

df = spark.createDataFrame([ (100.2,), (100.1,), (100.7,), (100.4,), (39.6, ), (39.6, ), (39.6, ), (39.6, ), (98.2, ), (10.8, ), (10.2, ), (10.8, ), (10.8, ), (62.1, ), (69.6, )], ['Foo'])
df2 = df.repartitionByRange('Foo')
print('No of partitions', df2.rdd.getNumPartitions())

No of partitions 8

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...