Не удалось загрузить файл в распределенный кеш в URI ||Получение NULLPointerException - PullRequest
0 голосов
/ 17 мая 2018

Я пытаюсь написать работу по сокращению карты, которая выполняет анализ настроений, я использую AFINN.txt в качестве словаря. Во время работы с картой, чтобы уменьшить объем работы, я помещаю ее в файл внутри HDFS и пытаюсь запустить, но каждый раз, когда это не удается. Я использую приведенный ниже код для сравнения слов с AFINN

    public class Sentiment_Analysis extends Configured implements Tool {

public static class Map extends Mapper<LongWritable, Text, Text, Text> {

    private URI[] files;

    private HashMap<String, String> AFINN_map = new HashMap<String, String>();

    @Override
    public void setup(Context context) throws IOException

    {

        files = DistributedCache.getCacheFiles(context.getConfiguration());

        System.out.println("files:" + files);

        Path path = new Path(files[0]); // here i am getting the Exception

        FileSystem fs = FileSystem.get(context.getConfiguration());

        FSDataInputStream in = fs.open(path);

        BufferedReader br = new BufferedReader(new InputStreamReader(in));

        String line = "";

        while ((line = br.readLine()) != null)

        {

            String splits[] = line.split("\t");

            AFINN_map.put(splits[0], splits[1]);

        }

        br.close();

        in.close();

    }

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String twt;
        String line = value.toString();
        String[] tuple = line.split("\\n");
        JSONParser jsonParser = new JSONParser();

        try {

            for (int i = 0; i < tuple.length; i++) {

                JSONObject obj = (JSONObject) jsonParser.parse(line);

                String tweet_id = (String) obj.get("id_str");

                String tweet_text = (String) obj.get("text");
                twt = (String) obj.get("text");
                String[] splits = twt.toString().split(" ");

                int sentiment_sum = 0;

                for (String word : splits) {

                    if (AFINN_map.containsKey(word))

                    {

                        Integer x = new Integer(AFINN_map.get(word));

                        sentiment_sum += x;

                    }

                }

                context.write(
                        new Text(tweet_id),
                        new Text(tweet_text + "\t----->\t"
                                + new Text(Integer.toString(sentiment_sum))));

            }

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

public static class Reduce extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Text value, Context context)
            throws IOException, InterruptedException {

        context.write(key, value);

    }

}

public static void main(String[] args) throws Exception

{

    ToolRunner.run(new Sentiment_Analysis(), args);

}

@Override
public int run(String[] args) throws Exception {

    Configuration conf = new Configuration();

    if (args.length != 2) {
        System.err.println("Usage: Parse <in> <out>");
        System.exit(2);
    }

    Job job = new Job(conf, "SentimentAnalysis");
    DistributedCache.addCacheFile(new URI("hdfs://localhost:50070//sentimentInput//AFINN.txt"), conf);
    job.setJarByClass(Sentiment_Analysis.class);

    job.setMapperClass(Map.class);

    job.setReducerClass(Reduce.class);

    job.setMapOutputKeyClass(Text.class);

    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(NullWritable.class);

    job.setOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);

    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

    return 0;

}

}

Мой локальный URL-адрес

 http://localhost:50070/

Но я поместил файл в формате hdf, используя следующие комнады

  bin/hdfs dfs -ls /sentimentInput
  18/05/17 12:25:46 WARN util.NativeCodeLoader: Unable to load native-hadoop 
  library for your platform... using builtin-java classes where applicable
  Found 2 items
  -rw-r--r--   1 jeet supergroup      28094 2018-05-17 11:43 
 /sentimentInput/AFINN.txt
 -rw-r--r--   1 jeet supergroup   13965969 2018-05-17 11:33 
/sentimentInput/FlumeData.1440939532959

, который говорит, что файл присутствует там, но когда я запускаю задание, он показывает мне следующую ошибку

bin/yarn jar ../sentiment.jar com.jeet.sentiment.Sentiment_Analysis /sentimentInput /sentimentOutput5


Exception in thread "main" java.lang.IllegalArgumentException: Pathname /localhost:50070/sentimentInput/AFINN.txt from hdfs:/localhost:50070/sentimentInput/AFINN.txt is not a valid DFS filename.
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:195)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:104)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1089)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1085)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1085)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)

Может кто-нибудь сказать мне, как я дам правильный путь к файлу, чтобы я мог проверить свой код, пожалуйста?

1 Ответ

0 голосов
/ 18 мая 2018

Ваш URI отсутствует /:

HDFS: //localhost.....

Edit:

попробуйте использовать обновленные методы для кэшированных файлов:

job.addCacheFile(uri);

content.getCachedFiles()
...