PySpark: создание разделов при чтении двоичного файла с использованием функции binaryFiles () - PullRequest
0 голосов
/ 29 мая 2018
sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).partitionBy(8)

или

sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).repartition(8)

Используя любой из приведенных выше кодов, я пытаюсь сделать 8 разделов в моем СДР, где я хочу, чтобы данные распределялись равномерно по всемперегородки} .Когда я печатаю {rdd.getNumPartitions ()} , количество отображаемых разделов составляет только 8, но на Spark UI я заметил, что хотя сделано 8 разделов, но всевсе данные двоичного файла помещаются только в один раздел.

Примечание: minPartition атрибут не работает.Даже после установки minPartitions = 5 количество разделов, выполненных в RDD, составляет только 1.Таким образом, используются функции partitionBy / repartition

Это желаемое поведение или я что-то упустил?

Ответы [ 2 ]

0 голосов
/ 22 июля 2018

Spark 2.4 + , проблема должна быть решена, см. Комментарий @ Rahul ниже этого ответа.

Spark 2.1-2.3 , аргумент minPartitionsbinaryFiles() игнорируется.См. Spark-16575 и изменения фиксации для функции setMinPartitions () .Обратите внимание, что в коммите изменяется, как minPartitions больше не используется в функции!

Если вы читаете несколько двоичных файлов с binaryFiles(), входные файлы будут объединены в разделы на основе следующего:

  • spark.files.maxPartitionBytes, по умолчанию 128 МБ
  • spark.files.openCostInBytes, по умолчанию 4 МБ
  • spark.default.parallelism
  • общий размер вашего ввода

Первые три элемента конфигурации описаны здесь .См. Изменение фиксации выше, чтобы увидеть фактические вычисления.

У меня был сценарий, в котором я хотел максимум 40 МБ на входной раздел, следовательно, 40 МБ на задачу ... чтобы увеличить параллелизм при разборе.(Spark помещал 128 МБ в каждый раздел, замедляя работу моего приложения.) Я установил spark.files.maxPartitionBytes на 40 М перед вызовом binaryFiles():

spark = SparkSession \
   .builder \
   .config("spark.files.maxPartitionBytes", 40*1024*1024)

Только для одного входного файла ответ @ user9864979 правильный: один файл не может быть разбит на несколько разделов с помощью binaryFiles().


При чтении нескольких файлов с Spark 1.6 аргумент minPartitions работает, и вы должны его использовать.В противном случае вы столкнетесь с проблемой Spark-16575 : все ваши входные файлы будут считаны только в два раздела!

Вы обнаружите, что Spark обычно дает вамменьше входных разделов, чем вы запрашиваете.У меня был сценарий, где я хотел один входной раздел для каждых двух входных двоичных файлов.Я обнаружил, что установка minPartitions на «# входных файлов * 7/10» дала мне примерно то, что я хотел.
У меня был другой сценарий, где я хотел один входной раздел для каждого входного файла.Я обнаружил, что установка minPartitions на «количество входных файлов * 2» дала мне то, что я хотел.

Spark 1.5 поведение binaryFiles(): вы получаете один раздел для каждого вводафайл.

0 голосов
/ 29 мая 2018

TL; DR Это ожидаемое поведение.

Поскольку вы читаете файл с binaryFiles, все содержимое файла загружается как одна запись, и отдельные записи не могут быть разделенычерез несколько разделов.Здесь просто нечего распространять.

...