Уменьшение картыКлюч пары, являющийся временным интервалом - PullRequest
0 голосов
/ 09 декабря 2018

У меня есть текстовый файл, содержащий в каждой строке следующее:

2018-11-27T08:06:11, 4.000000, 6.000000

2018-11-27T08:06:13, 9.000000, -1.000000

2018-11-27T08:06:15, 2.000000, -3.000000

2018-11-27T08:06:17, 1.000000, 9.000000

2018-11-27T08:06:19, 5.000000, -1.000000

Что мне нужно после применения Map-Reduce, чтобы выглядело так:

2018-11-27T08:06:00 -> 2018-11-27T08:06:30, 9.000000, 9.000000

[Значения] - это просто максимальное значение из 30-секундного интервала, но у меня возникают трудности с созданием этого ключа «Интервал времени».По сути, я хочу, чтобы мои ключи были с интервалом 30 секунд.

Я новичок в Hadoop и Map-Reduce, поэтому любые советы, идеи, ресурсы или фрагменты кода будут чрезвычайно полезны.

Заранее спасибо!

[РЕДАКТИРОВАТЬ]

Мне удалось реализовать это Map-Reduce, оно работает для моего случая, но по какой-то причине вывод отображает помимо желаемого, некоторые данные из входного файла.У вас есть идеи, почему?

Вот класс, я оставил комментарий в коде к строке, которая вызывает у меня проблемы.Если я удаляю эту строку, она больше не работает, но с этой записанной строкой я получаю этот неуклюжий вывод (содержащий данные для моего ввода + требуемый вывод)

Проблема заключается в методе «уменьшить» внизу.

public class SensorMapReducer
{
    public static class SensorMapper extends Mapper<Object, Text, Text, Text>{

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {
            String line = value.toString();

            if(line.startsWith("s"))
                processSensorLine(line,context);
            else
                processSimulatorLine(line,context);
        }

        private void processSensorLine(String line, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException
        {
            String[] values = line.split(",");

            Calendar gc = DatatypeConverter.parseDateTime(values[1]);

            Text bucket = new Text(String.format("%4d-%02d-%02dT%02d:%02d:00Z",
                    gc.get(Calendar.YEAR),
                    gc.get(Calendar.MONTH) + 1,
                    gc.get(Calendar.DAY_OF_MONTH),
                    gc.get(Calendar.HOUR),
                    gc.get(Calendar.MINUTE)));

            context.write(bucket, new Text("ACC," + values[2] + "," + values[3] + "," + values[4]));
            context.write(bucket, new Text("GYRO," + values[5] + "," + values[6] + "," + values[7]));
            context.write(bucket, new Text("MAG," + values[8] + "," + values[9] + "," + values[10]));
            context.write(bucket, new Text("REST," + values[11] + "," + values[12] + "," + values[13] + "," + values[14]));
        }

        private void processSimulatorLine(String line, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException
        {
            String[] values = line.split(",");

            Calendar gc = DatatypeConverter.parseDateTime(values[1]);

            Text bucket = new Text(String.format("%4d-%02d-%02dT%02d:%02d:00Z",
                    gc.get(Calendar.YEAR),
                    gc.get(Calendar.MONTH) + 1,
                    gc.get(Calendar.DAY_OF_MONTH),
                    gc.get(Calendar.HOUR),
                    gc.get(Calendar.MINUTE)));

            context.write(bucket, new Text("PF" + values[6]));
        }
    }

    public static class SensorReducer extends Reducer<Text, Text, Text, Text>
    {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
        {
            XYZ acc = new XYZ(), gyro = new XYZ();
            Single mlux = new Single(), temp = new Single(), pressure = new Single(), rh = new Single();
            int pass = 0, fail = 0;

            for(Text value : values)
            {
                String val = value.toString();
                if(val.startsWith("ACC"))
                    acc.process(val);
                else if(val.startsWith("GYRO"))
                    gyro.process(val);
                else if(val.startsWith("REST"))
                {
                    String[] vals = val.split(",");
                    mlux.process(vals[1]);
                    temp.process(vals[2]);
                    pressure.process(vals[3]);
                    rh.process(vals[4]);
                }
                else if(val.startsWith("PF"))
                {
                    String pf = val.substring(2);

                    if(pf.equalsIgnoreCase("1"))
                        pass++;
                    else
                        fail++;
                }

                **// On my environment, if I don't do this it fails for no reason that I can see, but
                // I DO NOT WANT THIS LINE TO BE WRITTEN!!!!**
                context.write(key,new Text(val));
            }

            StringBuffer sb = new StringBuffer();
            acc.append(sb);
            sb.append('\t');

            gyro.append(sb);
            sb.append('\t');

            mlux.append(sb);
            sb.append('\t');

            temp.append(sb);
            sb.append('\t');

            pressure.append(sb);
            sb.append('\t');

            rh.append(sb);
            sb.append('\t');

            sb.append(pass);
            sb.append('\t');

            sb.append(fail);

            context.write(key, new Text(sb.toString()));
        }
    }

    private static class Single {

        private int val, count, min, max;

        private void process(String val)
        {
            int v = Double.valueOf(val).intValue();

            this.val += v;
            this.count++;

            max = max > v ? max : v;
            min = min > v ? min : v;
        }

        public void append(StringBuffer sb)
        {
            if(count > 0)
                sb.append(val/count);
            else
                sb.append("");

            sb.append('\t');
            sb.append(min);
            sb.append('\t');
            sb.append(max);
        }
    }

    private static class XYZ {
        double x, y, z;
        double xMax = 0.0, yMax = 0.0, zMax = 0.0;
        double xMin = 0.0, yMin = 0.0, zMin = 0.0;
        int count = 0;

        public void process(String val)
        {
            String[] vals = val.split(",");

            double x = Double.valueOf(vals[1]);
            double y = Double.valueOf(vals[2]);
            double z = Double.valueOf(vals[3]);

            xMax = xMax < x ? x : xMax;
            yMax = yMax < y ? y : yMax;
            zMax = zMax < z ? z : zMax;

            xMin = xMin < x ? x : xMin;
            yMin = yMin < y ? y : yMin;
            zMin = zMin < z ? z : zMin;

            this.x += x;
            this.y += y;
            this.z += z;

            count++;
        }

        public void append(StringBuffer sb)
        {
            sb.append(x/count);
            sb.append('\t');
            sb.append(xMin);
            sb.append('\t');
            sb.append(xMax);
            sb.append('\t');
            sb.append(y/count);
            sb.append('\t');
            sb.append(yMin);
            sb.append('\t');
            sb.append(yMax);
            sb.append('\t');
            sb.append(z/count);
            sb.append('\t');
            sb.append(zMin);
            sb.append('\t');
            sb.append(zMax);
        }
    }

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");

        job.setJarByClass(SensorMapper.class);

        job.setMapperClass(SensorMapper.class);
        job.setCombinerClass(SensorReducer.class);
        job.setReducerClass(SensorReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Не могли бы вы помочь мне с любыми идеями?Спасибо!

...