Какова лучшая практика записи большого количества файлов в s3 с помощью Spark? - PullRequest
0 голосов
/ 06 апреля 2020

Я пытаюсь записать около 30k-60k паркетных файлов на s3 с помощью Spark, и это занимает огромное количество времени (40+ минут) из-за ограничения скорости s3. Интересно, есть ли лучшая практика, чтобы делать такие вещи. Я слышал, что запись данных в HDFS и последующее копирование с использованием s3-dist-cp может быть быстрее. Я не могу понять почему. не копирование из HDFS займет столько же времени из-за ограничения скорости s3?

Спасибо за помощь

1 Ответ

0 голосов
/ 06 апреля 2020

В этом подходе нет ничего плохого, и он работает абсолютно нормально в большинстве случаев использования, но могут возникнуть некоторые проблемы из-за способа записи файлов S3.

Две важные концепции для Понимание

  1. S3 (хранилище объектов)! = Файловая система POSIX: Операция переименования:

    Процесс переименования файлов в файловой системе на основе POSIX является операцией только с метаданными. Изменяется только указатель, и файл остается на диске как есть. Например, у меня есть файл ab c .txt, и я хочу переименовать его в xyz.txt его мгновенный и атомный c. Последняя измененная временная метка xyz.txt остается такой же, как и последняя измененная временная метка ab c .txt. Где, как в AWS S3 (хранилище объектов), переименование файла под капотом является копией с последующей операцией удаления. Исходный файл сначала копируется в место назначения, а затем удаляется исходный файл. Поэтому «aws s3 mv» изменяет последнюю измененную временную метку файла назначения в отличие от файловой системы POSIX. Метаданные здесь - это хранилище значений ключей, где ключ - это файл. путь и значение - это содержимое файла, и нет такого процесса, как изменение ключа и выполнение этого немедленно. Процесс переименования зависит от размера файла. Если существует переименование директории (в S3 нет ничего, что называется директорией, для простоты мы можем принять набор файлов в качестве директории для повторного получения), то это зависит от количества файлов внутри директории и размера каждого файла. Таким образом, в двух словах переименование является очень дорогой операцией в S3 по сравнению с обычной файловой системой.

  2. Модель согласованности S3

    S3 поставляется с 2 видами непротиворечивости a.read после записи b.eventual непротиворечивость и что в некоторых случаях приводит к тому, что файл не найден в ожидании. Файлы добавляются и не отображаются в списке, либо файлы удаляются или не удаляются из списка.

Deep explanation:

Spark использует реализации Hadoop FileOutputCommitter для записи данных. Повторная запись данных включает в себя несколько этапов и подготовку к выходным файлам высокого уровня, а затем их фиксацию, то есть написание конечных файлов. Вот шаг переименования, как я говорил ранее, от этапа подготовки к последнему этапу. Как вы знаете, задание на искру разделено на несколько этапы и набор задач, а также из-за характера распределенных вычислений задачи подвержены сбоям, поэтому есть также возможность перезапустить одну и ту же задачу из-за сбоя системы или спекулятивного выполнения медленно выполняющихся задач, что приводит к концепции фиксации и задания задачи Функции коммитов. Здесь у нас есть 2 варианта доступных алгоритмов и то, как выполняются коммиты работы и задачи, и, сказав, что ни один алгоритм не лучше другого, а зависит от того, где мы фиксируем данные.

mapreduce.fileoutputcommitter.algorithm.version=1

  • commitTask переименовывает данные, сгенерированные задачей, из временного каталога задачи во временный каталог задания.

  • Когда все задачи выполнены полный commitJob переименовать все данные из задание временного каталога до конечного пункта назначения и в конце создает файл _SUCCESS.

Здесь драйвер выполняет работу commitJob в конце, поэтому хранение объектов, таких как S3, может занять больше времени из-за большого количества задач временный файл ставится в очередь для операции переименования (хотя он не последовательный), и производительность записи не оптимизируется. Это может работать довольно хорошо для HDFS, поскольку переименование не дорого и просто изменение метаданных. Для AWS S3 во время выполнения каждой операции переименования файлов открывает огромное количество вызовов API на AWS S3 и может вызвать проблемы неожиданного закрытия вызовов API, если число файлов велико. Это не может также. Я видел оба случая в одном и том же задании, выполняющемся в два разных раза.

mapreduce.fileoutputcommitter.algorithm.version=2

  • commitTask перемещает данные, сгенерированные заданием, из Временный каталог задачи непосредственно к конечному месту назначения, как только задача будет выполнена.

  • commitJob в основном записывает файл _SUCCESS и ничего не делает.

На высоком уровне это выглядит оптимизированным, но оно имеет ограничение, заключающееся в невозможности выполнения спекулятивной задачи, а также в случае сбоя какой-либо задачи из-за поврежденных данных, тогда мы можем в конечном итоге получить остаточные данные в конечном месте назначения и потребовать очистки , Таким образом, этот алгоритм не дает 100% -ной корректности данных или не работает для случаев использования, когда нам нужны данные в режиме добавления к существующим файлам. Даже если это гарантирует, что оптимизированные результаты сопряжены с риском. Причина хорошей производительности в основном из-за меньшее количество операций переименования по сравнению с алгоритмом 1 (все еще есть переименования). Здесь мы можем столкнуться с проблемами, когда файл не найден, потому что commitTask записывает файл во временный путь и сразу же переименовывает его, и есть небольшая вероятность возможных проблем с согласованностью.

Best Practices

Вот некоторые из них, которые, я думаю, мы можем использовать при написании приложений для обработки искровых данных:

  • Если у вас есть доступный кластер HDFS, запишите данные из Spark в HDFS и скопируйте его на S3, чтобы сохранить. s3-dist-cp можно оптимально использовать для копирования данных из HDFS в S3. Здесь можно избежать всей этой операции переименования. При AWS EMR выполняется только на время вычислений, а затем прекращается для сохранения результата, такой подход выглядит предпочтительным.

  • Старайтесь избегать записи файлов и читать их снова и снова, если только у них нет потребителей для файлов, и Spark хорошо известна благодаря обработке в памяти и тщательному сохранению данных / хранению в кэш-памяти. помогите оптимизировать время выполнения приложения.

...