Map Сокращение задания для анализа журнала, не работающего в псевдораспределенном режиме Hadoop 2.7.3 - PullRequest
0 голосов
/ 04 февраля 2019

Я новичок в мире больших данных и назначил POC для обработки журналов, сгенерированных веб-приложением.Я успешно настроил hadoop в псевдораспределенном режиме на виртуальной машине Linux и сумел внедрить журналы веб-сервера с сервера Windows в hdfs, используя flume.Далее я написал программу mapreduce для анализа логов, которая отлично работает в Eclipse.Но когда я экспортирую jar-файл и перемещаю его на виртуальную машину hadoop, задание успешно завершается, выходные каталоги создаются в hdfs, но файл part- * пуст.Я проверил свой набор входных данных, проверив код на локальном затмении.Я пробовал удаленную отладку приложения, точки останова в основном методе обращаются, но в методе карты нет.Любая помощь будет оценена.

Я достаточно искал в интернете, но не смог найти ничего похожего.Ниже мой код

public class OneLoadTransactionsSuccessCount {

    public static class OneLoadLogMapper extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private static final Logger logger = Logger.getLogger(OneLoadLogMapper.class);
    private final static String SUCCESS_CODE = "0";

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        SimpleDateFormat sdf = new SimpleDateFormat("hh:mm dd MMM, yyyy");
        String text = value.toString();
        logger.info("text:::: " + text);
        System.out.println("text:::: " + text);
        int startingTag = text.indexOf("[#");
        int endingTag = text.indexOf("#]");
        if (startingTag != -1 && endingTag != -1) {
        try {

            String completeLog = text.substring(startingTag + 1, endingTag);
            logger.info("completeLog:::: " + completeLog);
            System.out.println("completeLog:::: " + completeLog);
            String[] tokens = completeLog.split("\\|");
            if (tokens != null && tokens.length > 0) {
            logger.info(tokens[1]);
            System.out.println(tokens[1]);
            if (tokens[6] != null) {
                String responseXML = tokens[6];
                logger.info("responseXML:::: " + responseXML);
                System.out.println("responseXML:::: " + responseXML);
                JAXBContext jaxbContext = JAXBContext.newInstance(LoadResponseMsg.class);
                Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();

                StringReader reader = new StringReader(responseXML);

                InputStream inputStream = new ByteArrayInputStream(
                    responseXML.getBytes(Charset.forName("UTF-8")));

                TransformerFactory factory = TransformerFactory.newInstance();
                Source xslt = new StreamSource(new File("removeNs.xslt"));
                Transformer transformer = factory.newTransformer(xslt);

                Source src = new StreamSource(inputStream);
                transformer.transform(src, new StreamResult(new File("tempoutput.xml")));

                File responseXMLFile = new File("tempoutput.xml");



                jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {

                @Override
                public boolean handleEvent(ValidationEvent event) {
                    throw new RuntimeException(event.getMessage(), event.getLinkedException());
                }
                });

                LoadResponseMsg loadResponseMsg = (LoadResponseMsg) jaxbUnmarshaller
                    .unmarshal(responseXMLFile);
                if (loadResponseMsg != null) {
                logger.info("reader:::: " + reader.toString());
                System.out.println("reader:::: " + reader.toString());
                if (loadResponseMsg.getResponseHeader().getResponseCode()
                    .equalsIgnoreCase(SUCCESS_CODE)) {
                    logger.info("status::: " + loadResponseMsg.getLoadResponse().getDescription());
                    System.out
                        .println("status::: " + loadResponseMsg.getLoadResponse().getDescription());
                    word.set(loadResponseMsg.getLoadResponse().getCompanyShortName());
                    context.write(word, one);
                }
                }

            }

            }
        } catch (JAXBException e) {
            logger.error(e);
        } catch (IOException e) {
            logger.error(e);
        } catch (Exception e) {
            logger.error(e);
        }
        }
    }
    }

    public static class OneLoadLogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
        sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
    }

    public static void main(String[] args) throws Exception {
    try {

        Configuration conf = new Configuration();
        System.out.println("in main method");
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(OneLoadTransactionsSuccessCount.class);
        job.setMapperClass(OneLoadLogMapper.class);
        job.setCombinerClass(OneLoadLogReducer.class);
        job.setReducerClass(OneLoadLogReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.out.println("about to run the job");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    } catch (Exception e) {
        System.err.println(e);
        e.printStackTrace();
    }

    }
}

1 Ответ

0 голосов
/ 13 февраля 2019

Итак, была задействована операция ввода-вывода.Я добавил нужный файл в classpath при создании jar, но моей средой разработки были windows, и я запускал его в Linux.проблема была из-за разных файловых систем.программа искала файл в неправильном месте в Linux.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...