Может ли Spark / EMR считывать данные с многопоточного s3 - PullRequest
2 голосов
/ 20 января 2020

Из-за некоторых неудачных последовательностей событий мы получили очень фрагментированный набор данных, хранящийся на s3. Метаданные таблицы хранятся в Glue, а данные записываются с помощью «bucketBy» и хранятся в формате паркета. Таким образом, обнаружение файлов не является проблемой, и количество разделов зажигания равно количеству сегментов, что обеспечивает хороший уровень параллелизма.

Когда мы загружаем этот набор данных в Spark / EMR, мы в конечном итоге загрузка каждого спарк-раздела около 8 тысяч файлов из s3.

Поскольку мы сохранили данные в столбчатом формате; согласно нашему сценарию использования, где нам нужна пара полей, мы на самом деле не читаем все данные, а очень небольшую часть того, что хранится.

Основываясь на использовании ЦП на рабочих узлах, я вижу, что каждая задача (запущенная на раздел) использует почти около 20% их ЦП, что, как я подозреваю, связано с тем, что на одну задачу на чтение задачи из файлов s3 последовательно, так много IOwait ...

Есть ли способ поощрить задачи зажигания на EMR к чтению данных из многопоточного s3, чтобы мы могли одновременно читать несколько файлов из s3 в пределах задача? Таким образом, мы можем использовать 80% бездействующего процессора, чтобы сделать все немного быстрее?

1 Ответ

2 голосов
/ 21 января 2020

Чтение данных S3 с помощью кадров данных Spark состоит из двух частей:

  1. Обнаружение (перечисление объектов на S3)
  2. Чтение объектов S3, включая распаковку, и т. Д. c .

Обнаружение обычно происходит в драйвере. В некоторых управляемых средах Spark есть оптимизации, которые используют ресурсы кластера для более быстрого обнаружения. Это, как правило, не проблема, если вы не получите более 100 000 объектов. Обнаружение происходит медленнее, если у вас есть .option("mergeSchema", true), поскольку каждый файл должен быть затронут, чтобы обнаружить его схему.

Чтение файлов S3 является частью выполнения действия. Параллелизм чтения min (количество разделов, количество доступных ядер). Больше разделов + больше доступных ядер означает более быстрый ввод-вывод ... в теории. На практике S3 может работать довольно медленно, если у вас нет регулярного доступа к этим файлам для S3, чтобы увеличить их доступность. Поэтому на практике дополнительный параллелизм Spark имеет убывающую отдачу. Наблюдайте за общей пропускной способностью RW сети на активное ядро ​​и настраивайте свое выполнение на самое высокое значение.

Количество разделов можно узнать с помощью df.rdd.partitions.length.

Существуют дополнительные действия, которые вы можете выполнить, если Пропускная способность ввода / вывода S3 низкая:

  1. Убедитесь, что данные на S3 разнесены, когда речь идет о его префиксе (см. https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html).

  2. Откройте запрос поддержки AWS и попросите увеличить префиксы с вашими данными.

  3. Поэкспериментируйте с различными типами узлов. Мы обнаружили, что оптимизированные для хранения узлы имеют более эффективный ввод / вывод.

Надеюсь, это поможет.

...