Привет всем,
Просто пытаюсь понять поток тестовой программы от одного из
Примеры. Я добавил регистратор, но в Cloudera-VM я могу видеть
только логи редуктора.
Я не могу увидеть журнал классов Mapper и Partitioner в
Cloudera-VM под работой -> раздел Логи. Можете ли вы помочь мне узнать, если я
ничего не хватает.
Вот простой код и конфигурация log4j.
Простая работа:
package org.apress.prohadoop.c6;
import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apress.prohadoop.utils.AirlineDataUtils;
public class SortAscMonthDescWeekMRJob extends Configured implements Tool {
protected static final Logger log = Logger.getLogger(SortAscMonthDescWeekMRJob.class.getName());
public static class SortAscMonthDescWeekMapper extends
Mapper<LongWritable, Text, MonthDoWWritable, DelaysWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (!AirlineDataUtils.isHeader(value)) {
String[] contents = value.toString().split(",");
String month = AirlineDataUtils.getMonth(contents);
String dow = AirlineDataUtils.getDayOfTheWeek(contents);
MonthDoWWritable mw = new MonthDoWWritable();
mw.month = new IntWritable(Integer.parseInt(month));
mw.dayOfWeek = new IntWritable(Integer.parseInt(dow));
log.info("Inside mapper " + "Month:"+ mw.month +"Week:"+":"+mw.dayOfWeek);
DelaysWritable dw = AirlineDataUtils.parseDelaysWritable(value
.toString());
context.write(mw, dw);
}
}
}
public static class SortAscMonthDescWeekReducer extends
Reducer<MonthDoWWritable, DelaysWritable, NullWritable, Text> {
public void reduce(MonthDoWWritable key,
Iterable<DelaysWritable> values, Context context)
throws IOException, InterruptedException {
for (DelaysWritable val : values) {
log.info("Inside Reducer DelaysWritable value " + val);
context.write(
NullWritable.get(),
new Text(AirlineDataUtils
.parseDelaysWritableToText(val)));
}
}
}
public static class MonthDoWPartitioner extends
Partitioner<MonthDoWWritable, DelaysWritable> implements Configurable {
private Configuration conf = null;
private int indexRange = 0;
private int getDefaultRange() {
int minIndex = 0;
int maxIndex = 11 * 7 + 6;
int range = (maxIndex - minIndex) + 1;
log.info("Inside MonthDoWPartitioner Calculating range " + range);
return range;
}
// @Override
public void setConf(Configuration conf) {
this.conf = conf;
this.indexRange = conf.getInt("key.range", getDefaultRange());
log.info("Inside MonthDoWPartitioner Calculating indexRange " + indexRange);
}
//@Override
public Configuration getConf() {
return this.conf;
}
public int getPartition(MonthDoWWritable key, DelaysWritable value,
int numReduceTasks) {
return AirlineDataUtils.getCustomPartition(key, indexRange,
numReduceTasks);
}
}
public int run(String[] allArgs) throws Exception {
Job job = Job.getInstance(getConf());
job.setJarByClass(SortAscMonthDescWeekMRJob.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(MonthDoWWritable.class);
job.setMapOutputValueClass(DelaysWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(SortAscMonthDescWeekMapper.class);
job.setReducerClass(SortAscMonthDescWeekReducer.class);
job.setPartitionerClass(MonthDoWPartitioner.class);
String[] args = new GenericOptionsParser(getConf(), allArgs)
.getRemainingArgs();
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
ToolRunner.run(new SortAscMonthDescWeekMRJob(), args);
}
}
Log4j properties
# Define some default values that can be overridden by system properties
hadoop.root.logger=INFO,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log
# Define the root logger to the system property "hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger}, EventCounter
# Logging Threshold
log4j.threshold=ALL
# Null Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
#
# Rolling File Appender - cap space usage at 5gb.
#
hadoop.log.maxfilesize=256MB
hadoop.log.maxbackupindex=20
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
#
# Daily Rolling File Appender
#
log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
# Rollver at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
#
# console
# Add "console" to rootlogger above if you want to use this
#
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
#
# TaskLog Appender
#
#Default values
hadoop.tasklog.taskid=null
hadoop.tasklog.iscleanup=false
hadoop.tasklog.noKeepSplits=4
hadoop.tasklog.totalLogFileSize=100
hadoop.tasklog.purgeLogSplits=true
hadoop.tasklog.logsRetainHours=12
log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
#
# HDFS block state change log from block manager
#
# Uncomment the following to suppress normal block state change
# messages from BlockManager in NameNode.
#log4j.logger.BlockStateChange=WARN