Предварительные данные в разделе Spark, так что каждый раздел имеет непересекающиеся значения в столбце, на который мы делим раздел - PullRequest
0 голосов
/ 06 января 2019

Я пытаюсь предварительно разбить данные на части перед выполнением операции агрегации для определенного столбца моих данных. У меня есть 3 рабочих узла, и мне бы хотелось, чтобы у каждого раздела были непересекающиеся значения в столбце, на который я делю разделы. Я не хочу ситуаций, когда два раздела могут иметь одинаковые значения в столбце.

например. Если у меня есть следующие данные

ss_item_sk | ss_quantity
1          | 10.0
1          |  4.0
2          |  3.0
3          |  5.0
4          |  8.0
5          |  13.0
5          |  10.0

Тогда следующие разделы являются удовлетворительными:

раздел 1

ss_item_sk | ss_quantity
1          | 10.0
1          |  4.0

раздел 2

ss_item_sk | ss_quantity
2          |  3.0
3          |  5.0

раздел 3

ss_item_sk | ss_quantity
4          |  8.0
5          |  13.0
5          |  10.0

К сожалению, приведенный ниже код не работает.

spark.sqlContext.setConf( "spark.sql.shuffle.partitions", "3")
var json = spark.read.json("hdfs://master:9000/tpcds/store_sales")
var filtered = json.filter(row => row.getAs[Long]("ss_item_sk") < 180)
filtered.repartition($"ss_item_sk").write.json(savepath)

Я уже посмотрел на

и я до сих пор не могу понять.

1 Ответ

0 голосов
/ 07 января 2019

Перераспределение по ключу осуществляет распределение данных на основе ключа на уровне фрейма данных. При написании фрейма данных на hdfs это отдельная вещь. Вы можете попробовать

df.coalesce(1).write.partitionBy("ss_item_sk").json(savepath)

В этом сценарии вы также увидите несколько файлов деталей в разных каталогах, созданных разделенным столбцом. Количество записывающих / восстанавливающих устройств, которые будут работать, можно контролировать только на основе метода «partitionBy». Он очень похож на Map Reduce Partitioner, так как он контролирует количество запускаемых редукторов. Чтобы получить один файл на основе столбца раздела, необходимо выполнить эту команду.

df.repartition($"ss_item_sk").write.partitionBy("ss_item_sk").json(savepath)

Теперь это работает, поскольку редуктор сопоставляется с номером раздела исполнителя. Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...