Как перезаписать / повторно использовать существующий выходной путь для заданий Hadoop и снова - PullRequest
21 голосов
/ 10 октября 2011

Я хочу перезаписать / повторно использовать существующий выходной каталог при ежедневном запуске задания Hadoop.На самом деле выходной каталог будет хранить итоговые результаты каждого дня выполнения задания.Если я указываю тот же выходной каталог, он выдает ошибку «выходной каталог уже существует».

Как обойти эту проверку?

Ответы [ 9 ]

14 голосов
/ 10 октября 2011

А как насчет удаления каталога перед запуском задания?

Вы можете сделать это через оболочку:

hadoop fs -rmr /path/to/your/output/

или через API Java:

// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
11 голосов
/ 10 октября 2011

Ответ Юнгблута - ваше прямое решение.Поскольку я никогда не доверяю автоматизированным процессам удаления материала (лично мне), я предложу альтернативу:

Вместо того, чтобы пытаться перезаписать, я предлагаю вам сделать имя вывода вашей работы динамическим, включая время, в котороеон бежал.

Что-то вроде "/path/to/your/output-2011-10-09-23-04/".Таким образом, вы можете сохранить прежние выходные данные на случай, если вам когда-нибудь понадобится вернуться к ним. В моей системе, которая запускает более 10 ежедневных заданий, мы структурируем выходные данные так: /output/job1/2011/10/09/job1out/part-r-xxxxx, /output/job1/2011/10/10/job1out/part-r-xxxxx и т. Д.

5 голосов
/ 05 августа 2014

Hadoop's TextInputFormat (который, я полагаю, вы используете) не позволяет перезаписать существующий каталог. Вероятно, извините вас за боль от того, что вы ошибочно удалили что-то, над чем вы (и ваш кластер) работали очень усердно.

Однако, если вы уверены, что хотите, чтобы ваша выходная папка была перезаписана заданием, я считаю, что самый простой способ - изменить TextOutputFormat примерно так:

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
      public RecordWriter<K, V> 
      getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
      {
          Configuration conf = job.getConfiguration();
          boolean isCompressed = getCompressOutput(job);
          String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
          CompressionCodec codec = null;
          String extension = "";
          if (isCompressed) 
          {
              Class<? extends CompressionCodec> codecClass = 
                      getOutputCompressorClass(job, GzipCodec.class);
              codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
              extension = codec.getDefaultExtension();
          }
          Path file = getDefaultWorkFile(job, extension);
          FileSystem fs = file.getFileSystem(conf);
          FSDataOutputStream fileOut = fs.create(file, true);
          if (!isCompressed) 
          {
              return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
          } 
          else 
          {
              return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
          }
      }
}

Теперь вы создаете FSDataOutputStream (fs.create(file, true)) с overwrite = true.

1 голос
/ 15 апреля 2015

Hadoop уже поддерживает эффект, которого вы, похоже, пытаетесь достичь, разрешив несколько путей ввода для задания. Вместо того, чтобы пытаться иметь один каталог файлов, в который вы добавляете больше файлов, есть каталог каталогов, в который вы добавляете новые каталоги. Чтобы использовать совокупный результат в качестве входных данных, просто укажите глобус ввода как подстановочный знак над подкаталогами (например, my-aggregate-output/*). Чтобы «добавить» новые данные в агрегат в качестве выходных данных, просто укажите новый уникальный подкаталог агрегата в качестве выходного каталога, обычно используя временную метку или некоторый порядковый номер, полученный из ваших входных данных (например, my-aggregate-output/20140415154424).

0 голосов
/ 16 июля 2019

Я столкнулся именно с этой проблемой, она проистекает из исключения, возникшего в checkOutputSpecs в классе FileOutputFormat.В моем случае мне хотелось иметь много заданий, добавляющих файлы в уже существующие каталоги, и я гарантировал, что файлы будут иметь уникальные имена.

Я решил это, создав класс выходного формата, который переопределяет только checkOutputSpecsметод и задыхается (игнорирует) FileAlreadyExistsException, который выдается, где он проверяет, существует ли каталог.

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
    @Override
    public void checkOutputSpecs(JobContext job) throws IOException {
        try {
            super.checkOutputSpecs(job);
        }catch (FileAlreadyExistsException ignored){
            // Suffocate the exception
        }
    }
}

И в конфигурации задания я использовал LazyOutputFormat, а также MultipleOutputs.

LazyOutputFormat.setOutputFormatClass(job, OverwriteTextOutputFormat.class);
0 голосов
/ 28 января 2019

Hadoop следует философии Однократная запись, многократное чтение. Таким образом, когда вы пытаетесь записать в каталог снова, он предполагает, что должен создать новый (однократная запись), но он уже существует, и так что жалуется. Вы можете удалить его через hadoop fs -rmr /path/to/your/output/. Для сохранения данных лучше создать динамический каталог (например, на основе метки времени или значения хеша).

0 голосов
/ 16 октября 2018

Если кто-то загружает входной файл (например, с добавленными записями) из локальной файловой системы в распределенную файловую систему hadoop следующим образом:

hdfs dfs -put  /mylocalfile /user/cloudera/purchase

Тогда можно также перезаписать / повторно использовать существующий выходной каталогс -f.Нет необходимости удалять или заново создавать папку

hdfs dfs -put -f  /updated_mylocalfile /user/cloudera/purchase
0 голосов
/ 07 сентября 2018

У меня был похожий вариант использования, для решения этой проблемы я использую MultipleOutputs.

Например, если я хочу, чтобы разные задания MapReduce записывали в один и тот же каталог /outputDir/.Задание 1 записывается в /outputDir/job1-part1.txt, задание 2 записывается в /outputDir/job1-part2.txt (без удаления выходных файлов).

В основном, установите выходной каталог в случайный (его можно удалить до запуска нового задания).)

FileInputFormat.addInputPath(job, new Path("/randomPath"));

В редукторе / маппере используйте MultipleOutputs и настройте устройство записи на запись в нужный каталог:

public void setup(Context context) {
    MultipleOutputs mos = new MultipleOutputs(context);
}

и:

mos.write(key, value, "/outputDir/fileOfJobX.txt")

Однако мой вариант использования был немного сложнее, чем этот.Если это просто запись в один и тот же плоский каталог, вы можете записать в другой каталог и запустить скрипт для переноса файлов, например: hadoop fs -mv /tmp/* /outputDir

В моем случае каждое задание MapReduce записывает в разные подпрограммы.-каталоги, основанные на значении написанного сообщения.Структура каталогов может быть многослойной, например:

/outputDir/
    messageTypeA/
        messageSubTypeA1/
            job1Output/
                job1-part1.txt
                job1-part2.txt
                ...
            job2Output/
                job2-part1.txt
                ...

        messageSubTypeA2/
        ...
    messageTypeB/
    ...

Каждое задание Mapreduce может записывать в тысячи подкаталогов.И стоимость записи в каталог tmp и перемещения каждого файла в правильный каталог высока.

0 голосов
/ 06 августа 2016

Вы можете создать выходной подкаталог для каждого выполнения по времени.Например, предположим, что вы ожидаете выходной каталог от пользователя, а затем установите его следующим образом:

FileOutputFormat.setOutputPath(job, new Path(args[1]);

Измените его следующим образом:

String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...