Проблема с журналами Hadoop / Cloudera - PullRequest
0 голосов
/ 02 мая 2018

Привет всем,

Просто пытаюсь понять поток тестовой программы от одного из Примеры. Я добавил регистратор, но в Cloudera-VM я могу видеть только логи редуктора.

Я не могу увидеть журнал классов Mapper и Partitioner в Cloudera-VM под работой -> раздел Логи. Можете ли вы помочь мне узнать, если я ничего не хватает.

Вот простой код и конфигурация log4j.

Простая работа:

package org.apress.prohadoop.c6;

import java.io.IOException;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apress.prohadoop.utils.AirlineDataUtils;

public class SortAscMonthDescWeekMRJob extends Configured implements Tool {

    protected static final Logger log = Logger.getLogger(SortAscMonthDescWeekMRJob.class.getName());


    public static class SortAscMonthDescWeekMapper extends
            Mapper<LongWritable, Text, MonthDoWWritable, DelaysWritable> {

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            if (!AirlineDataUtils.isHeader(value)) {
                String[] contents = value.toString().split(",");
                String month = AirlineDataUtils.getMonth(contents);
                String dow = AirlineDataUtils.getDayOfTheWeek(contents);
                MonthDoWWritable mw = new MonthDoWWritable();
                mw.month = new IntWritable(Integer.parseInt(month));
                mw.dayOfWeek = new IntWritable(Integer.parseInt(dow));

                log.info("Inside mapper " + "Month:"+ mw.month +"Week:"+":"+mw.dayOfWeek);
                DelaysWritable dw = AirlineDataUtils.parseDelaysWritable(value
                        .toString());
                context.write(mw, dw);

            }
        }
    }

    public static class SortAscMonthDescWeekReducer extends
            Reducer<MonthDoWWritable, DelaysWritable, NullWritable, Text> {
        public void reduce(MonthDoWWritable key,
                Iterable<DelaysWritable> values, Context context)
                throws IOException, InterruptedException {

            for (DelaysWritable val : values) {

                 log.info("Inside Reducer DelaysWritable value " + val);
                context.write(
                        NullWritable.get(),
                        new Text(AirlineDataUtils
                                .parseDelaysWritableToText(val)));
            }
        }
    }

    public static class MonthDoWPartitioner extends
            Partitioner<MonthDoWWritable, DelaysWritable> implements Configurable {
        private Configuration conf = null;
        private int indexRange = 0;

        private int getDefaultRange() {

            int minIndex = 0;
            int maxIndex = 11 * 7 + 6;
            int range = (maxIndex - minIndex) + 1;
            log.info("Inside MonthDoWPartitioner Calculating range " + range);
            return range;
        }

       // @Override
        public void setConf(Configuration conf) {
            this.conf = conf;
            this.indexRange = conf.getInt("key.range", getDefaultRange());
            log.info("Inside MonthDoWPartitioner Calculating indexRange " + indexRange);
        }

        //@Override
        public Configuration getConf() {
            return this.conf;
        }

        public int getPartition(MonthDoWWritable key, DelaysWritable value,
                int numReduceTasks) {
            return AirlineDataUtils.getCustomPartition(key, indexRange,
                    numReduceTasks);
        }
    }

    public int run(String[] allArgs) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(SortAscMonthDescWeekMRJob.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setMapOutputKeyClass(MonthDoWWritable.class);
        job.setMapOutputValueClass(DelaysWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(SortAscMonthDescWeekMapper.class);
        job.setReducerClass(SortAscMonthDescWeekReducer.class);
        job.setPartitionerClass(MonthDoWPartitioner.class);

        String[] args = new GenericOptionsParser(getConf(), allArgs)
                .getRemainingArgs();
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        ToolRunner.run(new SortAscMonthDescWeekMRJob(), args);
    }

}



Log4j properties

# Define some default values that can be overridden by system properties
hadoop.root.logger=INFO,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log

# Define the root logger to the system property "hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger}, EventCounter

# Logging Threshold
log4j.threshold=ALL

# Null Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender

#
# Rolling File Appender - cap space usage at 5gb.
#
hadoop.log.maxfilesize=256MB
hadoop.log.maxbackupindex=20
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}

log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}

log4j.appender.RFA.layout=org.apache.log4j.PatternLayout

# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n


#
# Daily Rolling File Appender
#

log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}

# Rollver at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd

# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout

# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n


#
# console
# Add "console" to rootlogger above if you want to use this 
#

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

#
# TaskLog Appender
#

#Default values
hadoop.tasklog.taskid=null
hadoop.tasklog.iscleanup=false
hadoop.tasklog.noKeepSplits=4
hadoop.tasklog.totalLogFileSize=100
hadoop.tasklog.purgeLogSplits=true
hadoop.tasklog.logsRetainHours=12

log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}

log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

#
# HDFS block state change log from block manager
#
# Uncomment the following to suppress normal block state change
# messages from BlockManager in NameNode.
#log4j.logger.BlockStateChange=WARN
...