У меня есть текстовый файл, содержащий в каждой строке следующее:
2018-11-27T08:06:11, 4.000000, 6.000000
2018-11-27T08:06:13, 9.000000, -1.000000
2018-11-27T08:06:15, 2.000000, -3.000000
2018-11-27T08:06:17, 1.000000, 9.000000
2018-11-27T08:06:19, 5.000000, -1.000000
Что мне нужно после применения Map-Reduce, чтобы выглядело так:
2018-11-27T08:06:00 -> 2018-11-27T08:06:30, 9.000000, 9.000000
[Значения] - это просто максимальное значение из 30-секундного интервала, но у меня возникают трудности с созданием этого ключа «Интервал времени».По сути, я хочу, чтобы мои ключи были с интервалом 30 секунд.
Я новичок в Hadoop и Map-Reduce, поэтому любые советы, идеи, ресурсы или фрагменты кода будут чрезвычайно полезны.
Заранее спасибо!
[РЕДАКТИРОВАТЬ]
Мне удалось реализовать это Map-Reduce, оно работает для моего случая, но по какой-то причине вывод отображает помимо желаемого, некоторые данные из входного файла.У вас есть идеи, почему?
Вот класс, я оставил комментарий в коде к строке, которая вызывает у меня проблемы.Если я удаляю эту строку, она больше не работает, но с этой записанной строкой я получаю этот неуклюжий вывод (содержащий данные для моего ввода + требуемый вывод)
Проблема заключается в методе «уменьшить» внизу.
public class SensorMapReducer
{
public static class SensorMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
if(line.startsWith("s"))
processSensorLine(line,context);
else
processSimulatorLine(line,context);
}
private void processSensorLine(String line, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException
{
String[] values = line.split(",");
Calendar gc = DatatypeConverter.parseDateTime(values[1]);
Text bucket = new Text(String.format("%4d-%02d-%02dT%02d:%02d:00Z",
gc.get(Calendar.YEAR),
gc.get(Calendar.MONTH) + 1,
gc.get(Calendar.DAY_OF_MONTH),
gc.get(Calendar.HOUR),
gc.get(Calendar.MINUTE)));
context.write(bucket, new Text("ACC," + values[2] + "," + values[3] + "," + values[4]));
context.write(bucket, new Text("GYRO," + values[5] + "," + values[6] + "," + values[7]));
context.write(bucket, new Text("MAG," + values[8] + "," + values[9] + "," + values[10]));
context.write(bucket, new Text("REST," + values[11] + "," + values[12] + "," + values[13] + "," + values[14]));
}
private void processSimulatorLine(String line, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException
{
String[] values = line.split(",");
Calendar gc = DatatypeConverter.parseDateTime(values[1]);
Text bucket = new Text(String.format("%4d-%02d-%02dT%02d:%02d:00Z",
gc.get(Calendar.YEAR),
gc.get(Calendar.MONTH) + 1,
gc.get(Calendar.DAY_OF_MONTH),
gc.get(Calendar.HOUR),
gc.get(Calendar.MINUTE)));
context.write(bucket, new Text("PF" + values[6]));
}
}
public static class SensorReducer extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
XYZ acc = new XYZ(), gyro = new XYZ();
Single mlux = new Single(), temp = new Single(), pressure = new Single(), rh = new Single();
int pass = 0, fail = 0;
for(Text value : values)
{
String val = value.toString();
if(val.startsWith("ACC"))
acc.process(val);
else if(val.startsWith("GYRO"))
gyro.process(val);
else if(val.startsWith("REST"))
{
String[] vals = val.split(",");
mlux.process(vals[1]);
temp.process(vals[2]);
pressure.process(vals[3]);
rh.process(vals[4]);
}
else if(val.startsWith("PF"))
{
String pf = val.substring(2);
if(pf.equalsIgnoreCase("1"))
pass++;
else
fail++;
}
**// On my environment, if I don't do this it fails for no reason that I can see, but
// I DO NOT WANT THIS LINE TO BE WRITTEN!!!!**
context.write(key,new Text(val));
}
StringBuffer sb = new StringBuffer();
acc.append(sb);
sb.append('\t');
gyro.append(sb);
sb.append('\t');
mlux.append(sb);
sb.append('\t');
temp.append(sb);
sb.append('\t');
pressure.append(sb);
sb.append('\t');
rh.append(sb);
sb.append('\t');
sb.append(pass);
sb.append('\t');
sb.append(fail);
context.write(key, new Text(sb.toString()));
}
}
private static class Single {
private int val, count, min, max;
private void process(String val)
{
int v = Double.valueOf(val).intValue();
this.val += v;
this.count++;
max = max > v ? max : v;
min = min > v ? min : v;
}
public void append(StringBuffer sb)
{
if(count > 0)
sb.append(val/count);
else
sb.append("");
sb.append('\t');
sb.append(min);
sb.append('\t');
sb.append(max);
}
}
private static class XYZ {
double x, y, z;
double xMax = 0.0, yMax = 0.0, zMax = 0.0;
double xMin = 0.0, yMin = 0.0, zMin = 0.0;
int count = 0;
public void process(String val)
{
String[] vals = val.split(",");
double x = Double.valueOf(vals[1]);
double y = Double.valueOf(vals[2]);
double z = Double.valueOf(vals[3]);
xMax = xMax < x ? x : xMax;
yMax = yMax < y ? y : yMax;
zMax = zMax < z ? z : zMax;
xMin = xMin < x ? x : xMin;
yMin = yMin < y ? y : yMin;
zMin = zMin < z ? z : zMin;
this.x += x;
this.y += y;
this.z += z;
count++;
}
public void append(StringBuffer sb)
{
sb.append(x/count);
sb.append('\t');
sb.append(xMin);
sb.append('\t');
sb.append(xMax);
sb.append('\t');
sb.append(y/count);
sb.append('\t');
sb.append(yMin);
sb.append('\t');
sb.append(yMax);
sb.append('\t');
sb.append(z/count);
sb.append('\t');
sb.append(zMin);
sb.append('\t');
sb.append(zMax);
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(SensorMapper.class);
job.setMapperClass(SensorMapper.class);
job.setCombinerClass(SensorReducer.class);
job.setReducerClass(SensorReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Не могли бы вы помочь мне с любыми идеями?Спасибо!