для использования partitionBy () СДР должен состоять из кортежа (пары) объектов.Давайте посмотрим на пример ниже:
Предположим, у меня есть входной файл со следующими данными:
OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
1|Gas|2018-01-17|1895|1
1|Air Conditioners|2018-01-28|19000|3
1|Television|2018-01-11|45000|2
2|Gas|2018-01-17|1895|1
2|Air Conditioners|2017-01-28|19000|3
2|Gas|2016-01-17|2300|1
1|Bottle|2018-03-24|45|10
1|Cooking oil|2018-04-22|100|3
3|Inverter|2015-11-02|29000|1
3|Gas|2014-01-09|2300|1
3|Television|2018-01-17|45000|2
4|Gas|2018-01-17|2300|1
4|Television$$|2018-01-17|45000|2
5|Medicine|2016-03-14|23.50|8
5|Cough Syrup|2016-01-28|190|1
5|Ice Cream|2014-09-23|300|7
5|Pasta|2015-06-30|65|2
PATH_TO_FILE="file:///u/vikrant/OrderInputFile"
чтение файла в RDD и пропуск заголовка
RDD = sc.textFile(PATH_TO_FILE)
header=RDD.first();
newRDD = RDD.filter(lambda x:x != header)
Теперь давайте вернемся-разделение СДР на «5» разделов
partitionRDD = newRDD.repartition(5)
позволяет взглянуть, как данные распределяются в этих «5» разделах
print("Partitions structure: {}".format(partitionRDD.glom().collect()))
здесь вы можете видеть, что данные записываются вдва раздела и три из них пусты, а также он не распределен равномерно.
Partitions structure: [[],
[u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
[u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
[], []]
Нам необходимо создать пару СДР, чтобы данные СДР были распределены равномерно по количеству разделов.Позволяет создать пару СДР и разбить ее на пару ключ-значение.
pairRDD = newRDD.map(lambda x :(x[0],x[1:]))
теперь позволяет перераспределить этот rdd в раздел «5» и распределить данные равномерно по разделам, используя ключ в [0] -й позиции.
newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))
теперь мы можем видеть, что данные распределяются равномерно в соответствии с соответствующими парами ключ-значение.
print("Partitions structure: {}".format(newpairRDD.glom().collect()))
Partitions structure: [
[(u'5', u'|Medicine|2016-03-14|23.50|8'),
(u'5', u'|Cough Syrup|2016-01-28|190|1'),
(u'5', u'|Ice Cream|2014-09-23|300|7'),
(u'5', u'|Pasta|2015-06-30|65|2')],
[(u'1', u'|Gas|2018-01-17|1895|1'),
(u'1', u'|Air Conditioners|2018-01-28|19000|3'),
(u'1', u'|Television|2018-01-11|45000|2'),
(u'1', u'|Bottle|2018-03-24|45|10'),
(u'1', u'|Cooking oil|2018-04-22|100|3')],
[(u'2', u'|Gas|2018-01-17|1895|1'),
(u'2', u'|Air Conditioners|2017-01-28|19000|3'),
(u'2', u'|Gas|2016-01-17|2300|1')],
[(u'3', u'|Inverter|2015-11-02|29000|1'),
(u'3', u'|Gas|2014-01-09|2300|1'),
(u'3', u'|Television|2018-01-17|45000|2')],
[(u'4', u'|Gas|2018-01-17|2300|1'),
(u'4', u'|Television$$|2018-01-17|45000|2')]
]
ниже вы можете проверить количество записей в каждом разделе.
from pyspark.sql.functions import desc
from pyspark.sql.functions import spark_partition_id
partitionSizes = newpairRDD.glom().map(len).collect();
[4, 5, 3, 3, 2]
Обратите внимание, что при создании пары RDD пары ключ-значение ключ должен иметь тип int, иначе вы получите сообщение об ошибке.
Надеюсь, это поможет!