У меня проблема с fileNotFound с использованием приведенного ниже кода, это простой тест для распределенного кэша, я не знаю, в чем проблема?
Путь к файлу правильный, но я не могу найти файлына датоде:
package mapJoinTest2;
/*
* this is for map join using DistributedCache
* using class Path to get cache file in datanode
*
* 2012.1.13
* */
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class wordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private Text mapKey = new Text();
private Path [] localFile= new Path[1];
private FileSystem fs;
public void configure(JobConf job){
try {
fs = FileSystem.getLocal(new Configuration());
localFile = DistributedCache.getLocalCacheFiles(job);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
for(Path f:localFile){
System.out.println(f.toString());
}
mapKey.set("success");
output.collect(mapKey, value);
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(wordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setNumReduceTasks(0);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
String path ="hdfs://namenode:9000/hadoop/test1"; // this file has already put on hdfs.
Path filePath = new Path(path);
String uriWithLink = filePath.toUri().toString();
DistributedCache.addCacheFile(new URI(uriWithLink), conf);
JobClient.runJob(conf);
}
}
Я получаю NullPointerException
в этот момент:
for(Path f:localFile){
System.out.println(f.toString());
}
Проблема в том, что значение f равно null
.
Iиспользуйте код ниже, но он не будет работать.
DistributedCache.createdSymlink(conf);
DistributedCache.addCacheFile(new Path("hdfs://namenode:9000/hadoop/test1").toUri().toString() + "#" + "test1",conf);