Связывание нескольких заданий MapReduce в Hadoop - PullRequest
118 голосов
/ 23 марта 2010

Во многих реальных ситуациях, когда вы применяете MapReduce, окончательные алгоритмы заканчиваются несколькими шагами MapReduce.

т.е. Map1, Reduce1, Map2, Reduce2 и т. Д.

Таким образом, у вас есть выходные данные последнего снижения, которые необходимы в качестве входных данных для следующей карты.

Промежуточные данные - это то, что вы (в общем) не хотите хранить после успешного завершения конвейера. Кроме того, поскольку эти промежуточные данные, как правило, представляют собой некоторую структуру данных (например, «карту» или «набор»), вы не хотите прилагать слишком много усилий для написания и чтения этих пар ключ-значение.

Каков рекомендуемый способ сделать это в Hadoop?

Есть ли (простой) пример, показывающий, как правильно обрабатывать эти промежуточные данные, включая последующую очистку?

Ответы [ 13 ]

53 голосов
/ 25 марта 2010

Я думаю, что это руководство в сети разработчиков Yahoo поможет вам в этом: Работа в цепочке

Вы используете JobClient.runJob().Путь вывода данных из первого задания становится входным путем ко второму заданию.Они должны быть переданы в качестве аргументов для ваших заданий с соответствующим кодом для их анализа и настройки параметров для задания.

Я думаю, что описанный выше метод может, однако, быть таким же образом, как это было сделано в более старом API Mapred, но это все равно должно работать.В новом API mapreduce будет аналогичный метод, но я не уверен, что это такое.

Что касается удаления промежуточных данных после завершения задания, вы можете сделать это в своем коде.То, как я делал это раньше, использует что-то вроде:

FileSystem.delete(Path f, boolean recursive);

Где путь - это местоположение в HDFS данных.Вам необходимо убедиться, что вы удаляете эти данные только в том случае, если никакая другая работа не требует этого.

18 голосов
/ 03 июля 2010

Есть много способов сделать это.

(1) Каскадные задания

Создайте объект JobConf "job1" для первого задания и установите всепараметры с «input» в качестве входного каталога и «temp» в качестве выходного каталога.Выполните это задание:

JobClient.run(job1).

Сразу под ним создайте объект JobConf "job2" для второго задания и задайте все параметры с помощью "temp" в качестве входного каталога и "output" в качестве выходного каталога.Выполните это задание:

JobClient.run(job2).

(2) Создайте два объекта JobConf и задайте все параметры в них, как (1) за исключением того, что вы не используете JobClient.run.

Затем создайте два объекта Job с параметрами jobconfs:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

Используя объект jobControl, вы задаете зависимости задания и затем запускаетевакансии:

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) Если вам нужна структура, похожая на Map + |Уменьшить |Map *, вы можете использовать классы ChainMapper и ChainReducer, которые поставляются с Hadoop версии 0.19 и выше.

Cheers

7 голосов
/ 26 марта 2010

Есть несколько способов сделать это.Я сосредоточусь на двух.

Один из них через Riffle (http://github.com/cwensel/riffle) - библиотека аннотаций для идентификации зависимых вещей и «выполнения» их в зависимости (топологическом) порядке.

Или вы можете использовать Cascade (и MapReduceFlow) в Cascading (http://www.cascading.org/).В будущей версии будут поддерживаться аннотации Riffle, но теперь она отлично работает с необработанными заданиями MR JobConf.

Вариант этого заключается в том, чтобы вообще не управлять заданиями MR вручную, а разрабатывать свое приложение с помощью каскадного API.Затем JobConf и цепочка заданий обрабатываются внутри с помощью каскадного планировщика и классов Flow.

Таким образом, вы тратите свое время на то, чтобы сосредоточиться на своей проблеме, а не на механике управления заданиями Hadoop и т. Д. Вы даже можете наложить разные языкисверху (например, clojure или jruby), чтобы еще больше упростить разработку и приложения.http://www.cascading.org/modules.html

6 голосов
/ 15 октября 2014

Я выполнил цепочку заданий, используя объекты JobConf один за другим. Я взял пример WordCount для цепочки заданий. Одна работа вычисляет, сколько раз слово повторяется в данном выводе. Второе задание принимает выходные данные первого задания и вычисляет общее количество слов в заданном вводе. Ниже приведен код, который необходимо поместить в класс Driver.

    //First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class);
    job1.setJobName("WordCount");

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    job1.setMapperClass(WordCountMapper.class);
    job1.setCombinerClass(WordCountReducer.class);
    job1.setReducerClass(WordCountReducer.class);

    job1.setInputFormat(TextInputFormat.class);
    job1.setOutputFormat(TextOutputFormat.class);

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
    FileInputFormat.setInputPaths(job1, new Path("input_data"));

    //"first_job_output" contains data that how many times a word occurred in the given file
    //This will be the input to the second job. For second job, input data name should be
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));

    JobClient.runJob(job1);


    //Second Job - Counts total number of words in a given file

    JobConf job2 = new JobConf(TotalWords.class);
    job2.setJobName("TotalWords");

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    job2.setMapperClass(TotalWordsMapper.class);
    job2.setCombinerClass(TotalWordsReducer.class);
    job2.setReducerClass(TotalWordsReducer.class);

    job2.setInputFormat(TextInputFormat.class);
    job2.setOutputFormat(TextOutputFormat.class);

    //Path name for this job should match first job's output path name
    FileInputFormat.setInputPaths(job2, new Path("first_job_output"));

    //This will contain the final output. If you want to send this jobs output
    //as input to third job, then third jobs input path name should be "second_job_output"
    //In this way, jobs can be chained, sending output one to other as input and get the
    //final output
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));

    JobClient.runJob(job2);

Команда для запуска этих заданий:

корзина для мусора / бадья TotalWords.

Нам нужно дать окончательное имя задания для команды. В приведенном выше случае это TotalWords.

5 голосов
/ 27 октября 2015

Вы можете запустить цепочку MR в порядке, указанном в коде. ОБРАТИТЕ ВНИМАНИЕ : был предоставлен только код драйвера

public class WordCountSorting {
// here the word keys shall be sorted
      //let us write the wordcount logic first

      public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
            //THE DRIVER CODE FOR MR CHAIN
            Configuration conf1=new Configuration();
            Job j1=Job.getInstance(conf1);
            j1.setJarByClass(WordCountSorting.class);
            j1.setMapperClass(MyMapper.class);
            j1.setReducerClass(MyReducer.class);

            j1.setMapOutputKeyClass(Text.class);
            j1.setMapOutputValueClass(IntWritable.class);
            j1.setOutputKeyClass(LongWritable.class);
            j1.setOutputValueClass(Text.class);
            Path outputPath=new Path("FirstMapper");
            FileInputFormat.addInputPath(j1,new Path(args[0]));
                  FileOutputFormat.setOutputPath(j1,outputPath);
                  outputPath.getFileSystem(conf1).delete(outputPath);
            j1.waitForCompletion(true);
                  Configuration conf2=new Configuration();
                  Job j2=Job.getInstance(conf2);
                  j2.setJarByClass(WordCountSorting.class);
                  j2.setMapperClass(MyMapper2.class);
                  j2.setNumReduceTasks(0);
                  j2.setOutputKeyClass(Text.class);
                  j2.setOutputValueClass(IntWritable.class);
                  Path outputPath1=new Path(args[1]);
                  FileInputFormat.addInputPath(j2, outputPath);
                  FileOutputFormat.setOutputPath(j2, outputPath1);
                  outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                  System.exit(j2.waitForCompletion(true)?0:1);
      }

}

ПОСЛЕДОВАТЕЛЬНОСТЬ( JOB1 ) MAP-> REDUCE-> ( JOB2 ) MAPЭто было сделано для сортировки ключей, но есть и другие способы, такие как использование древовидной карты.И все же я хочу сосредоточить ваше внимание на том, как были закованы рабочие места !!Спасибо

4 голосов
/ 24 марта 2010

Вы можете использовать oozie для обработки ваших заданий MapReduce.http://issues.apache.org/jira/browse/HADOOP-5303

3 голосов
/ 28 января 2013

Мы можем использовать метод waitForCompletion(true) задания для определения зависимости между заданием.

В моем сценарии у меня было 3 задания, которые зависели друг от друга.В классе драйверов я использовал приведенный ниже код, и он работает, как и ожидалось.

public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        CCJobExecution ccJobExecution = new CCJobExecution();

        Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
        Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
        Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);

        System.out.println("****************Started Executing distanceTimeFraudJob ================");
        distanceTimeFraudJob.submit();
        if(distanceTimeFraudJob.waitForCompletion(true))
        {
            System.out.println("=================Completed DistanceTimeFraudJob================= ");
            System.out.println("=================Started Executing spendingFraudJob ================");
            spendingFraudJob.submit();
            if(spendingFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed spendingFraudJob================= ");
                System.out.println("=================Started locationFraudJob================= ");
                locationFraudJob.submit();
                if(locationFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed locationFraudJob=================");
                }
            }
        }
    }
3 голосов
/ 26 мая 2011

В проекте Apache Mahout есть примеры, объединяющие несколько заданий MapReduce.Один из примеров можно найти по адресу:

RecommenderJob.java

http://search -lucene.com / c / Mahout: / core / src / main / java / org /апач / погонщик / ср / вкус / Hadoop / товар / RecommenderJob.java% 7C% 7CRecommenderJob

2 голосов
/ 01 февраля 2016

Новый класс org.apache.hadoop.mapreduce.lib.chain.ChainMapper поможет этому сценарию

1 голос
/ 28 февраля 2014

Если вы хотите программно связать свою работу, вы захотите использовать JobControl.Использование довольно просто:

    JobControl jobControl = new JobControl(name);

После этого вы добавляете экземпляры ControlledJob.ControlledJob определяет работу с ее зависимостями, таким образом, автоматически подключая входы и выходы, чтобы соответствовать «цепочке» заданий.Вы хотите поместить это в поток скорости.Это позволяет проверять состояние вашей цепочки во время ее работы:

    while (!jobControl.allFinished()) {
        System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
        System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
        System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
        List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
        System.out.println("Jobs in success state: " + successfulJobList.size());
        List<ControlledJob> failedJobList = jobControl.getFailedJobList();
        System.out.println("Jobs in failed state: " + failedJobList.size());
    }
...