Я написал программу mapreduce, но когда я попытался запустить на hadoop, это не удалось, так как он генерирует столько промежуточных данных, что я получаю сообщение об ошибке: на узле больше нет места. После того, как он пытается со вторым узлом, но результат тот же. Я хотел бы обработать два текстовых файла: примерно ~ 60 тыс. Строк.
Я попытался: - включить сжатие Snappy, но это не помогло. - добавьте больше места, чтобы у двух узлов было хранилище 50-50 ГБ
Поскольку ни один из них не помог, возможно, проблема в коде, а не в настройке.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class FirstMapper extends Mapper<LongWritable, Text, Text, Text> {
enum POS_TAG {
CC, CD, DT, EX,
FW, IN, JJ, JJR,
JJS, LS, MD, NN,
NNS, NNP, NNPS, PDT,
WDT, WP, POS, PRP,
PRP$, RB, RBR, RBS,
RP, SYM, TO, UH,
VB, VBD, VBG, VBN,
VBP, VBZ, WP$, WRB
}
private static final List<String> tags = Stream.of(POS_TAG.values())
.map(Enum::name)
.collect(Collectors.toList());
private static final int MAX_NGRAM = 5;
private static String[][] cands = {
new String[3],
new String[10],
new String[32],
new String[10]
};
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String location = conf.get("job.cands.path");
if (location != null) {
BufferedReader br = null;
try {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(location);
if (fs.exists(path)) {
FSDataInputStream fis = fs.open(path);
br = new BufferedReader(new InputStreamReader(fis));
String line;
int i = 0;
while ((line = br.readLine()) != null) {
String[] splitted = line.split(" ");
cands[i] = splitted;
i++;
}
}
} catch (IOException e) {
//
} finally {
br.close();
}
}
}
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(" ");
int m = tokens.length;
for (int n = 2; n <= MAX_NGRAM; n++) {
for (int s = 0; s <= m - n; s++) {
for (int i = 0; i < cands[n - 2].length; i++) {
List<String> pattern = new ArrayList<>();
List<String> metWords = new ArrayList<>();
for (int j = 0; j <= n - 1; j++) {
String[] pair = tokens[s + j].split("/");
String word = pair[0];
String pos = pair[1];
char c = cands[n - 2][i].charAt(j);
addToPattern(word, pos, c, pattern);
if (c > 0 && tags.contains(pos)) {
metWords.add(word);
}
}
if (metWords.isEmpty()) {
metWords.add("_NONE");
}
Text resultKey = new Text(pattern.toString() + ";" + metWords.toString());
context.write(resultKey, new Text(key.toString()));
}
}
}
}
public void addToPattern(String word, String pos, char c, List<String> pattern) {
switch (c) {
case 'w':
pattern.add(word);
break;
case 'p':
pattern.add(pos);
break;
default:
pattern.add("_WC_");
break;
}
}
}
public class Main {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("job.cands.path", "/user/thelfter/pwp");
Job job1 = Job.getInstance(conf, "word pattern1");
job1.setJarByClass(Main.class);
job1.setMapperClass(FirstMapper.class);
job1.setCombinerClass(FirstReducer.class);
job1.setReducerClass(FirstReducer.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path("/user/thelfter/output"));
System.exit(job1.waitForCompletion(true) ? 0 : 1);
}
}