Сохранить порядок разделов Spark с сериализацией текстовых файлов - PullRequest
0 голосов
/ 07 апреля 2020

Spark может сериализовать набор данных строк в набор текстовых файлов, по одному для каждого раздела. Однако при чтении этих текстовых файлов, похоже, не соблюдается порядок разделов.

Скажем, у меня есть СДР myDataset из 8 элементов, разделенных на 4 раздела. Я могу сериализовать его:

scala> val myDataset = sc.parallelize(Array(0, 1, 2, 3, 4, 5, 6, 7), 4)
scala> myDataset.saveAsTextFile("/tmp/myDataset")

Он сериализуется следующим образом:

  • файл /tmp/myDataset/part-00000 содержит две строки: 0 и 1
  • Файл /tmp/myDataset/part-00001 содержит две строки: 2 и 3
  • Файл /tmp/myDataset/part-00002 содержит две строки: 4 и 5
  • Файл /tmp/myDataset/part-00003 содержит две строки: 6 и 7

Теперь, если я снова прочитаю свой набор данных:

scala> spark.read.textFile("/tmp/myDataset").take(8)
res43: Array[String] = Array(6, 7, 2, 3, 4, 5, 0, 1)

Я бы ожидал, что Spark прочитает разделы в том порядке, в котором они их записали, что сохранить порядок элементов в наборе данных.

Есть ли способ сохранить этот порядок?

1 Ответ

0 голосов
/ 07 апреля 2020

Кажется, это известная ошибка из-за того, что основная функция, используемая для вывода списка файлов в локальной файловой системе, File.listFiles(), не сортирует возвращаемое значение.

Один из обходных путей - реализовать выделенный FileInputFormat, который сортирует имена файлов:

public class OrderedTextInputFormat extends TextInputFormat {

    @Override
    protected List<FileStatus> listStatus(JobContext job) throws IOException  {
        List<FileStatus> files = super.listStatus(job);
        files.sort(new Comparator<FileStatus>() {

            @Override
            public int compare(FileStatus lhs, FileStatus rhs) {
                return lhs.getPath().compareTo(rhs.getPath());
            }

        });
        return files;
    }
}
...