Работа EMR Streaming с использованием Java-кода для мапперов и редукторов - PullRequest
0 голосов
/ 30 января 2012

В настоящее время у меня выполняются потоковые задания с кодом мапперов и редукторов, написанным на ruby.Я хочу преобразовать их в Java.Я не знаю, как запустить потоковое задание с помощью EMR Hadoop с использованием Java.Пример, приведенный на веб-сайте Amazon EMR о облачных взрывах, слишком сложен.Ниже приведены подробные сведения о том, как я выполняю задания в настоящий момент.

Код для запуска задания:

        elastic-mapreduce --create --alive --plain-output --master-instance-type m1.small 
--slave-instance-type m1.xlarge --num-instances 2  --name "Job Name" --bootstrap-action 
    s3://bucket-path/bootstrap.sh

Код для добавления шага:

    elastic-mapreduce -j <job_id> --stream --step-name "my_step_name" 
--jobconf mapred.task.timeout=0 --mapper s3://bucket-path/mapper.rb 
--reducer s3://bucket-path/reducerRules.rb --cache s3://bucket-path/cache/cache.txt 
--input s3://bucket-path/input --output s3://bucket-path/output

Код Mapperчитает из CSV-файла, который упоминается выше в качестве аргумента кэша EMR, а также из входной корзины s3, в которой также есть некоторые CSV-файлы, выполняет некоторые вычисления и выводит выходные строки CSV в стандартный вывод.

//mapper.rb 
CSV_OPTIONS  = {
  // some CSV options
}

begin
    file = File.open("cache.txt")
    while (line = file.gets)
        // do something
    end
    file.close
end

input  = FasterCSV.new(STDIN, CSV_OPTIONS)
input.each{ 
// do calculations and get result
puts (result)
}

//reducer.rb

$stdin.each_line do |line|
// do some aggregations and get aggregation_result
if(some_condition) puts(aggregation_result)
end

Ответы [ 2 ]

0 голосов
/ 07 ноября 2012

Так как теперь у меня есть лучшая крепость на Hadoop и Mapreduce, вот что я ожидал:

Чтобы запустить кластер, код останется более или менее таким же, как в вопросе, но мы можем добавить параметры конфигурации:

ruby elastic-mapreduce --create --alive --plain-output --master-instance-type m1.xlarge --slave-instance-type m1.xlarge --num-instances 11  --name "Java Pipeline" --bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop --args "--mapred-config-file, s3://com.versata.emr/conf/mapred-site-tuned.xml"

Чтобы добавить рабочие шаги:

Шаг 1:

ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-one.jar --arg s3://somepath/input-one --arg s3://somepath/output-one --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0

Шаг 2:

ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-two.jar --arg s3://somepath/output-one --arg s3://somepath/output-two --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0

Теперь, что касается кода Java, будет один класс Main, который будет содержать одну реализацию для каждого из следующих классов:

  • org.apache.hadoop.mapreduce.Mapper; * * тысяча двадцать-одна
  • org.apache.hadoop.mapreduce.Reducer;

Каждый из них должен переопределить методы map () и lower () для выполнения желаемой работы.

Класс Java для рассматриваемой проблемы будет выглядеть следующим образом:

public class SomeJob extends Configured implements Tool {

    private static final String JOB_NAME = "My Job";

    /**
     * This is Mapper.
     */
    public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {

        private Text outputKey = new Text();
        private Text outputValue = new Text();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

            // Get the cached file
            Path file = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];

            File fileObject = new File (file.toString());
            // Do whatever required with file data
        }

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            outputKey.set("Some key calculated or derived");
            outputVey.set("Some Value calculated or derived");
            context.write(outputKey, outputValue);
        }
        }

    /**
     * This is Reducer.
     */
    public static class ReduceJob extends Reducer<Text, Text, Text, Text> {

    private Text outputKey = new Text();
    private Text outputValue = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {
            outputKey.set("Some key calculated or derived");
            outputVey.set("Some Value calculated or derived");
            context.write(outputKey, outputValue);
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        try {
            Configuration conf = getConf();
            DistributedCache.addCacheFile(new URI(args[2]), conf);
            Job job = new Job(conf);

            job.setJarByClass(TaxonomyOverviewReportingStepOne.class);
            job.setJobName(JOB_NAME);

            job.setMapperClass(MapJob.class);
            job.setReducerClass(ReduceJob.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, args[0]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            boolean success = job.waitForCompletion(true);
            return success ? 0 : 1;
        } catch (Exception e) {
            e.printStackTrace();
            return 1;
        }

    }

    public static void main(String[] args) throws Exception {

        if (args.length < 3) {
            System.out
                    .println("Usage: SomeJob <comma sparated list of input directories> <output dir> <cache file>");
            System.exit(-1);
        }

        int result = ToolRunner.run(new TaxonomyOverviewReportingStepOne(), args);
        System.exit(result);
    }

}
0 голосов
/ 24 апреля 2012

Вы не используете потоковую передачу, если вы используете Java.Вы создаете Jar-файл непосредственно против API MapReduce.

В папке примеров исходного кода hadoop есть несколько хороших примеров того, как это сделать, включая печально известный wordcount: https://github.com/apache/hadoop/tree/trunk/src/examples/org/apache/hadoop/examples

IЯ не совсем уверен, почему вы хотите использовать Java, но кодирование непосредственно в API может быть болезненным.Возможно, вы захотите попробовать одно из следующих: Проекты Java:

Не Java:

FWIWЯ думаю, что Свин, вероятно, будет моим выбором, и поддерживается из коробки на EMR.

...