Spark RDD раздел по ключу эксклюзивным способом - PullRequest
0 голосов
/ 20 ноября 2018

Я хотел бы разделить СДР по ключу и иметь, чтобы каждый раздел содержал только значения одного ключа. Например, если у меня есть 100 различных значений ключа и I repartition(102), в СДР должно быть 2 пустых раздела и 100 разделов, каждый из которых содержит одно значение ключа.

Я пробовал с groupByKey(k).repartition(102), но это не гарантирует исключительность ключа в каждом разделе, так как я вижу некоторые разделы, содержащие больше значений одного ключа и более 2 пустых.

Есть ли способ в стандартном API сделать это?

Ответы [ 2 ]

0 голосов
/ 27 ноября 2018

для использования partitionBy () СДР должен состоять из кортежа (пары) объектов.Давайте посмотрим на пример ниже:

Предположим, у меня есть входной файл со следующими данными:

OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
1|Gas|2018-01-17|1895|1
1|Air Conditioners|2018-01-28|19000|3
1|Television|2018-01-11|45000|2
2|Gas|2018-01-17|1895|1
2|Air Conditioners|2017-01-28|19000|3
2|Gas|2016-01-17|2300|1
1|Bottle|2018-03-24|45|10
1|Cooking oil|2018-04-22|100|3
3|Inverter|2015-11-02|29000|1
3|Gas|2014-01-09|2300|1
3|Television|2018-01-17|45000|2
4|Gas|2018-01-17|2300|1
4|Television$$|2018-01-17|45000|2
5|Medicine|2016-03-14|23.50|8
5|Cough Syrup|2016-01-28|190|1
5|Ice Cream|2014-09-23|300|7
5|Pasta|2015-06-30|65|2

PATH_TO_FILE="file:///u/vikrant/OrderInputFile"

чтение файла в RDD и пропуск заголовка

RDD = sc.textFile(PATH_TO_FILE)
header=RDD.first();
newRDD = RDD.filter(lambda x:x != header)

Теперь давайте вернемся-разделение СДР на «5» разделов

partitionRDD = newRDD.repartition(5)

позволяет взглянуть, как данные распределяются в этих «5» разделах

print("Partitions structure: {}".format(partitionRDD.glom().collect()))

здесь вы можете видеть, что данные записываются вдва раздела и три из них пусты, а также он не распределен равномерно.

Partitions structure: [[], 
[u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'], 
[u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'], 
[], []]

Нам необходимо создать пару СДР, чтобы данные СДР были распределены равномерно по количеству разделов.Позволяет создать пару СДР и разбить ее на пару ключ-значение.

pairRDD = newRDD.map(lambda x :(x[0],x[1:]))

теперь позволяет перераспределить этот rdd в раздел «5» и распределить данные равномерно по разделам, используя ключ в [0] -й позиции.

newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))

теперь мы можем видеть, что данные распределяются равномерно в соответствии с соответствующими парами ключ-значение.

print("Partitions structure: {}".format(newpairRDD.glom().collect()))
Partitions structure: [
[(u'5', u'|Medicine|2016-03-14|23.50|8'), 
(u'5', u'|Cough Syrup|2016-01-28|190|1'), 
(u'5', u'|Ice Cream|2014-09-23|300|7'), 
(u'5', u'|Pasta|2015-06-30|65|2')],

[(u'1', u'|Gas|2018-01-17|1895|1'), 
(u'1', u'|Air Conditioners|2018-01-28|19000|3'), 
(u'1', u'|Television|2018-01-11|45000|2'), 
(u'1', u'|Bottle|2018-03-24|45|10'), 
(u'1', u'|Cooking oil|2018-04-22|100|3')], 

[(u'2', u'|Gas|2018-01-17|1895|1'), 
(u'2', u'|Air Conditioners|2017-01-28|19000|3'), 
(u'2', u'|Gas|2016-01-17|2300|1')], 

[(u'3', u'|Inverter|2015-11-02|29000|1'), 
(u'3', u'|Gas|2014-01-09|2300|1'), 
(u'3', u'|Television|2018-01-17|45000|2')], 

[(u'4', u'|Gas|2018-01-17|2300|1'), 
(u'4', u'|Television$$|2018-01-17|45000|2')]
]

ниже вы можете проверить количество записей в каждом разделе.

from pyspark.sql.functions import desc
from pyspark.sql.functions import spark_partition_id

partitionSizes = newpairRDD.glom().map(len).collect();

[4, 5, 3, 3, 2]

Обратите внимание, что при создании пары RDD пары ключ-значение ключ должен иметь тип int, иначе вы получите сообщение об ошибке.

Надеюсь, это поможет!

0 голосов
/ 20 ноября 2018

Для СДР вы пытались использовать partitionBy для разбиения СДР по ключу, как в этот вопрос ? Вы можете указать количество разделов, которое будет количеством ключей, чтобы избавиться от пустых разделов, если это необходимо.

В API набора данных вы можете использовать перераспределение с Column в качестве аргумента для разделения по значениям в этом столбце (хотя обратите внимание, что при этом используется значение spark.sql.shuffle.partitions в качестве числа разделы, так что вы получите гораздо больше пустых разделов).

...