Spark DataFrame против CreateDataFrame - PullRequest
0 голосов
/ 13 января 2019

All

У нас есть много файлов в ионном формате, которые мы должны сделать некоторые вычисления для данных, прежде чем сгруппировать и отфильтровать их (их общий размер составляет около 1,3 ТБ, каждый файл составляет около 200–300 МБ). Я пытался сделать эти 2 разных способа.

Сначала нужно распараллелить список файлов из S3, который мне нужен, и преобразовать его в RDD [Row]. Затем создайте фреймы данных из него.

val rdd = ss.sparkContext.parallelize(suspendedList, suspendedList.size).flatMap(chunk => {
  Ranger.consumeStreamToRow(chunk, dfSchema.value)
})
val df = ss.createDataFrame(rdd, schema)

Второй способ - распараллелить список файлов и записать его в файлы паркета в S3 с помощью AvroParquetWriter. Загрузите его обратно как dataFrame

val rdd = ss.sparkContext.parallelize(suspendedList, suspendedList.size).foreach( chunk => {
  Ranger.writeParquetFile(chunk, avroSchema.value, TaskContext.getPartitionId())
})

Второй способ оказался намного быстрее первого, хотя он должен писать и читать с S3. Одна вещь, на которую я обращаю внимание, - это то, что первый метод, по-видимому, имеет очень много времени для сбора мусора по сравнению со следующим, что, я думаю, способствует более длительному времени, необходимому для фактической обработки данных.

Может кто-нибудь объяснить, что происходит с первым и вторым методами, и почему второй метод оказался намного быстрее, хотя я читаю и пишу в S3? (Я установил для памяти исполнителя 10 ГБ, а для драйвера - 40 ГБ, и я использую Spark EMR)

1 Ответ

0 голосов
/ 13 января 2019

На основании ваших комментариев я понимаю, что первый подход читает файл и создает List[Row], а второй подход использует искру для чтения файлов. Исходя из этого (поправьте меня, если я ошибаюсь), я объясню.

Объект RDD имеет только информацию о «как и где читать данные» , а не фактические данные (вроде потоков или генераторов питона). Таким образом, чтобы создать RDD, spark перечислит все файлы в каталоге s3 и (обычно) создаст раздел для каждого файла и поместит все это в объект RDD. Пока фактические данные еще даже не прочитаны / загружены.
Дополнительное преобразование (например, фильтр, карта) в этом СДР просто создает больше СДР (с предыдущим СДР в качестве источника) и вычислений, которые необходимо выполнить. Ничего еще не рассчитано.

Только когда выполняется окончательное Действие (приемник), например, save, collect, count, фактические данные читаются, и каждый раздел читается отдельным работником (возможно, на другой физической машине). ).
Таким образом, в любой момент времени одному компьютеру (работнику) потребуется лишь небольшая часть данных в памяти.

Если при чтении файла файл читается построчно, чтобы сначала создать List[String], каждая строка в файле становится одним значением в списке, метод map в этом списке используется для преобразования каждого объекта String. Роу объект. И все это находится в памяти процесса с одним драйвером (мастер) практически без параллелизма. Следовательно, очень медленный, требует много памяти (и сборки мусора).

Подводя итог, поймите, какая часть вашего кода выполняется в процессе драйвера (только одна для каждого приложения) и какие части выполняются в рабочих (приложение spark может иметь несколько рабочих процессов на разных компьютерах). И перенести свои расчеты на рабочих.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...