В этом подходе нет ничего плохого, и он работает абсолютно нормально в большинстве случаев использования, но могут возникнуть некоторые проблемы из-за способа записи файлов S3.
Две важные концепции для Понимание
S3 (хранилище объектов)! = Файловая система POSIX: Операция переименования:
Процесс переименования файлов в файловой системе на основе POSIX является операцией только с метаданными. Изменяется только указатель, и файл остается на диске как есть. Например, у меня есть файл ab c .txt, и я хочу переименовать его в xyz.txt его мгновенный и атомный c. Последняя измененная временная метка xyz.txt остается такой же, как и последняя измененная временная метка ab c .txt. Где, как в AWS S3 (хранилище объектов), переименование файла под капотом является копией с последующей операцией удаления. Исходный файл сначала копируется в место назначения, а затем удаляется исходный файл. Поэтому «aws s3 mv» изменяет последнюю измененную временную метку файла назначения в отличие от файловой системы POSIX. Метаданные здесь - это хранилище значений ключей, где ключ - это файл. путь и значение - это содержимое файла, и нет такого процесса, как изменение ключа и выполнение этого немедленно. Процесс переименования зависит от размера файла. Если существует переименование директории (в S3 нет ничего, что называется директорией, для простоты мы можем принять набор файлов в качестве директории для повторного получения), то это зависит от количества файлов внутри директории и размера каждого файла. Таким образом, в двух словах переименование является очень дорогой операцией в S3 по сравнению с обычной файловой системой.
Модель согласованности S3
S3 поставляется с 2 видами непротиворечивости a.read после записи b.eventual непротиворечивость и что в некоторых случаях приводит к тому, что файл не найден в ожидании. Файлы добавляются и не отображаются в списке, либо файлы удаляются или не удаляются из списка.
Deep explanation:
Spark использует реализации Hadoop FileOutputCommitter для записи данных. Повторная запись данных включает в себя несколько этапов и подготовку к выходным файлам высокого уровня, а затем их фиксацию, то есть написание конечных файлов. Вот шаг переименования, как я говорил ранее, от этапа подготовки к последнему этапу. Как вы знаете, задание на искру разделено на несколько этапы и набор задач, а также из-за характера распределенных вычислений задачи подвержены сбоям, поэтому есть также возможность перезапустить одну и ту же задачу из-за сбоя системы или спекулятивного выполнения медленно выполняющихся задач, что приводит к концепции фиксации и задания задачи Функции коммитов. Здесь у нас есть 2 варианта доступных алгоритмов и то, как выполняются коммиты работы и задачи, и, сказав, что ни один алгоритм не лучше другого, а зависит от того, где мы фиксируем данные.
mapreduce.fileoutputcommitter.algorithm.version=1
commitTask переименовывает данные, сгенерированные задачей, из временного каталога задачи во временный каталог задания.
Когда все задачи выполнены полный commitJob переименовать все данные из задание временного каталога до конечного пункта назначения и в конце создает файл _SUCCESS.
Здесь драйвер выполняет работу commitJob в конце, поэтому хранение объектов, таких как S3, может занять больше времени из-за большого количества задач временный файл ставится в очередь для операции переименования (хотя он не последовательный), и производительность записи не оптимизируется. Это может работать довольно хорошо для HDFS, поскольку переименование не дорого и просто изменение метаданных. Для AWS S3 во время выполнения каждой операции переименования файлов открывает огромное количество вызовов API на AWS S3 и может вызвать проблемы неожиданного закрытия вызовов API, если число файлов велико. Это не может также. Я видел оба случая в одном и том же задании, выполняющемся в два разных раза.
mapreduce.fileoutputcommitter.algorithm.version=2
commitTask перемещает данные, сгенерированные заданием, из Временный каталог задачи непосредственно к конечному месту назначения, как только задача будет выполнена.
commitJob в основном записывает файл _SUCCESS и ничего не делает.
На высоком уровне это выглядит оптимизированным, но оно имеет ограничение, заключающееся в невозможности выполнения спекулятивной задачи, а также в случае сбоя какой-либо задачи из-за поврежденных данных, тогда мы можем в конечном итоге получить остаточные данные в конечном месте назначения и потребовать очистки , Таким образом, этот алгоритм не дает 100% -ной корректности данных или не работает для случаев использования, когда нам нужны данные в режиме добавления к существующим файлам. Даже если это гарантирует, что оптимизированные результаты сопряжены с риском. Причина хорошей производительности в основном из-за меньшее количество операций переименования по сравнению с алгоритмом 1 (все еще есть переименования). Здесь мы можем столкнуться с проблемами, когда файл не найден, потому что commitTask записывает файл во временный путь и сразу же переименовывает его, и есть небольшая вероятность возможных проблем с согласованностью.
Best Practices
Вот некоторые из них, которые, я думаю, мы можем использовать при написании приложений для обработки искровых данных:
Если у вас есть доступный кластер HDFS, запишите данные из Spark в HDFS и скопируйте его на S3, чтобы сохранить. s3-dist-cp можно оптимально использовать для копирования данных из HDFS в S3. Здесь можно избежать всей этой операции переименования. При AWS EMR выполняется только на время вычислений, а затем прекращается для сохранения результата, такой подход выглядит предпочтительным.
Старайтесь избегать записи файлов и читать их снова и снова, если только у них нет потребителей для файлов, и Spark хорошо известна благодаря обработке в памяти и тщательному сохранению данных / хранению в кэш-памяти. помогите оптимизировать время выполнения приложения.