Исключение Spark FileAlreadyExistsException при сбое этапа при записи файла JSON - PullRequest
1 голос
/ 09 июля 2020

Я пытаюсь записать фрейм данных в местоположение s3 в формате JSON. Но всякий раз, когда задача исполнителя терпит неудачу и Spark повторяет этап, он выдает FileAlreadyExistsException.

A аналогичный вопрос задавался раньше, но он обращается к файлам OR C с отдельной искрой conf и не решает мою проблему.

Это мой код:

val result = spark.sql(query_that_OOMs_executor)
result.write.mode(SaveMode.Overwrite).json(s3_path)

В пользовательском интерфейсе искры ошибка на исполнителе говорит:

ExecutorLostFailure (executor 302 exited caused by one of the running tasks) 
Reason: Container killed by YARN for exceeding memory limits. 4.5 GB of 4.5 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

Но Трассировка стека драйверов говорит:

Job aborted due to stage failure: Task 1344 in stage 2.0 failed 4 times, most recent failure: Lost task 1344.3 in stage 2.0 (TID 25797, executor.ec2.com, executor 217): org.apache.hadoop.fs.FileAlreadyExistsException: s3://prod-bucket/application_1590774027047/-650323473_1594243391573/part-01344-dc971661-93ef-4abc-8380-c000.json already exists

Как сделать так, чтобы искра пыталась перезаписать этот JSON файл? Таким образом, я получу реальную причину на драйвере, когда все 4 попытки завершатся неудачно. Я уже установил режим перезаписи, так что это не помогает.

1 Ответ

2 голосов
/ 16 июля 2020

Эта проблема возникла из-за фундаментальной проблемы с DirectFileOutputCommitter, который использовался здесь по умолчанию.

Здесь есть две вещи: OOM исполнителя и затем FileAlreadyExistsException при повторных попытках, вызывающих повторные попытки (и, следовательно, запрос SQL) не удастся.

Причина: DirectFileOutputCommitter попытается записать выходные файлы в одной попытке задачи в конечный выходной путь. Он сделает это, записав в промежуточный каталог, а затем переименовав его в окончательный путь и удалив оригинал. Это плохо и подвержено несоответствиям и ошибкам, а также не рекомендуется Spark.

Вместо этого я использовал коммиттер Netflix S3 , который делал бы это в многочастной манере. Сначала он запишет файлы на локальный диск, затем во время фиксации задачи каждый из них будет загружен в S3 по частям, но не будет сразу виден, затем во время фиксации задания (что произойдет только тогда, когда все задачи будут выполнены. успешно, так что это безопасная операция) данные локального диска будут удалены и загрузка будет завершена (теперь данные будут видны на S3). Это предотвращает неудачные задачи, напрямую записывающие данные в S3, и, следовательно, избегает FileAlreadyExistsException при повторной попытке.

Теперь для OOM исполнителя - они все еще выполняются для моего запроса, но повторные попытки успешны, которые также терпели неудачу ранее с DirectFileOutputCommitter.

Чтобы решить эту проблему, я в основном сделал

set spark.sql.sources.outputCommitterClass=com.netflix.s3.S3DirectoryOutputCommitter;
...