Spark RDD: разбиение в соответствии с форматом текстового файла - PullRequest
0 голосов
/ 28 июня 2018

У меня есть текстовый файл, содержащий десятки гигабайт данных, которые мне нужно загрузить из HDFS и распараллелить как RDD. Этот текстовый файл описывает элементы в следующем формате. Обратите внимание, что буквенные строки отсутствуют (значение каждой строки неявно) и что каждая строка может содержать пробелы для разделения различных значений:

0001  (id)
1000 1000 2000 (dimensions)
0100           (weight)
0030           (amount)
0002  (id)
1110 1000 5000 (dimensions)
0220           (weight)
3030           (amount)

Я считаю, что самый промежуточный подход для распараллеливания этого файла - это загрузить его в HDFS из локальной файловой системы, а затем создать RDD, выполнив sc.textFile(filepath). Однако в этом случае разбиение будет зависеть от разбиений HDFS, соответствующих файлу.

Проблема указанного подхода заключается в том, что каждый раздел может содержать неполные элементы. Например:

Раздел 1

0001           (id)
1000 1000 2000 (dimensions)
0100           (weight)
0030           (amount)
0002           (id)
1110 1000 5000 (dimensions)

Раздел 2

0220           (weight)
3030           (amount)

Таким образом, когда мы вызываем метод для каждого раздела и передаем ему соответствующий блок данных, он получит неполную спецификацию для элемента, обозначенного как 0002. Это приведет к неверному выводу для вычислений, выполненных внутри вызываемого способ.

Какой самый эффективный способ разбить или переразметить этот RDD, чтобы избежать этой проблемы? Можно ли указать количество строк каждого раздела, кратное 4? Если да, то должен ли это делать Hadoop или Spark?

Ответы [ 2 ]

0 голосов
/ 01 июля 2018

Загрузите текстовый файл, чтобы получить RDD[String], затем используйте zipWithIndex для преобразования в RDD[(String, Long)], где второй атрибут в кортеже - это номер индекса элемента в СДР.

Сжатие этого СДР с индексами его элементов. Порядок сначала определяется на основе индекса раздела, а затем порядка элементов в каждом разделе. Таким образом, первый элемент в первом разделе получает индекс 0, а последний элемент в последнем разделе получает самый большой индекс.

  • Используя индекс в качестве номера строки (начиная с 0), мы можем сгруппировать строки, принадлежащие записи. например. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...
  • Поскольку мы знаем, что каждая запись занимает (ровно) 4 строки, целочисленное деление индекса на 4 (давайте назовем это idx_div). Это приведет к тому, что первые четыре строки будут иметь значение 0 как idx_div, следующие четыре строки получат 1 как idx_div и так далее. например. [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, .... Это можно использовать для группировки всех (четырех) строк, принадлежащих одной записи, для дальнейшего анализа и обработки


case class Record(id:String, dimensions:String, weight:String, amount:String)
val lines = sc.textFile("...")
val records = lines
    .zipWithIndex
    .groupBy(line_with_idx => (line_with_idx._2 / 4))  // groupBy idx_div
    .map(grouped_record => {
        val (idx_div:Long, lines_with_idx:Iterable[(String, Long)]) = grouped_record
        val lines_with_idx_list = lines_with_idx.toList.sortBy(_._2)  // Additional check to ensure ordering
        val lines_list = lines_with_idx_list.map(_._1)
        val List(id:String, dimensions:String, weight:String, amount:String) = lines_list
        new Record(id, dimensions, weight, amount)
    })
0 голосов
/ 30 июня 2018

Почему бы просто не сгруппировать строки перед тем, как поместить файл в HDFS, чтобы избежать этой проблемы?

xargs -L4 echo < file
hdfs dfs -put file /your/path

Ваши данные будут выглядеть как

0001  1000  0100  0030 
0002  1110  0220  3030

Если вы это сделаете, вы можете читать свои данные с помощью Spark DataFrames API, который является более оптимальным чем RDD и предоставляет вам более богатый API и производительность для написания вашего приложения.

...