Hadoop mapreduce job создает слишком большие промежуточные файлы - PullRequest
0 голосов
/ 06 ноября 2019

Я написал программу mapreduce, но когда я попытался запустить на hadoop, это не удалось, так как он генерирует столько промежуточных данных, что я получаю сообщение об ошибке: на узле больше нет места. После того, как он пытается со вторым узлом, но результат тот же. Я хотел бы обработать два текстовых файла: примерно ~ 60 тыс. Строк.

Я попытался: - включить сжатие Snappy, но это не помогло. - добавьте больше места, чтобы у двух узлов было хранилище 50-50 ГБ

Поскольку ни один из них не помог, возможно, проблема в коде, а не в настройке.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class FirstMapper extends Mapper<LongWritable, Text, Text, Text> {

    enum POS_TAG {
        CC, CD, DT, EX,
        FW, IN, JJ, JJR,
        JJS, LS, MD, NN,
        NNS, NNP, NNPS, PDT,
        WDT, WP, POS, PRP,
        PRP$, RB, RBR, RBS,
        RP, SYM, TO, UH,
        VB, VBD, VBG, VBN,
        VBP, VBZ, WP$, WRB
    }

    private static final List<String> tags = Stream.of(POS_TAG.values())
            .map(Enum::name)
            .collect(Collectors.toList());
    private static final int MAX_NGRAM = 5;
    private static String[][] cands = {
            new String[3],
            new String[10],
            new String[32],
            new String[10]
    };

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();

        String location = conf.get("job.cands.path");

        if (location != null) {
            BufferedReader br = null;
            try {
                FileSystem fs = FileSystem.get(conf);
                Path path = new Path(location);

                if (fs.exists(path)) {
                    FSDataInputStream fis = fs.open(path);
                    br = new BufferedReader(new InputStreamReader(fis));

                    String line;
                    int i = 0;
                    while ((line = br.readLine()) != null) {
                        String[] splitted = line.split(" ");
                        cands[i] = splitted;
                        i++;
                    }

                }
            } catch (IOException e) {
                //
            } finally {
                br.close();
            }
        }
    }

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokens = value.toString().split(" ");
        int m = tokens.length;

        for (int n = 2; n <= MAX_NGRAM; n++) {
            for (int s = 0; s <= m - n; s++) {
                for (int i = 0; i < cands[n - 2].length; i++) {
                    List<String> pattern = new ArrayList<>();
                    List<String> metWords = new ArrayList<>();

                    for (int j = 0; j <= n - 1; j++) {
                        String[] pair = tokens[s + j].split("/");
                        String word = pair[0];
                        String pos = pair[1];

                        char c = cands[n - 2][i].charAt(j);
                        addToPattern(word, pos, c, pattern);
                        if (c > 0 && tags.contains(pos)) {
                            metWords.add(word);
                        }
                    }
                    if (metWords.isEmpty()) {
                        metWords.add("_NONE");
                    }

                    Text resultKey = new Text(pattern.toString() + ";" + metWords.toString());
                    context.write(resultKey, new Text(key.toString()));
                }
            }
        }


    }

    public void addToPattern(String word, String pos, char c, List<String> pattern) {
        switch (c) {
            case 'w':
                pattern.add(word);
                break;
            case 'p':
                pattern.add(pos);
                break;
            default:
                pattern.add("_WC_");
                break;
        }
    }
}



public class Main {


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set("job.cands.path", "/user/thelfter/pwp");

        Job job1 = Job.getInstance(conf, "word pattern1");
        job1.setJarByClass(Main.class);
        job1.setMapperClass(FirstMapper.class);
        job1.setCombinerClass(FirstReducer.class);
        job1.setReducerClass(FirstReducer.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path("/user/thelfter/output"));    
        System.exit(job1.waitForCompletion(true) ? 0 : 1);
    }
}

1 Ответ

0 голосов
/ 12 ноября 2019

Если вы используете YARN, то дисковое пространство диспетчера узлов контролируется yarn.nodemanager.local-dirs в вашем файле yarn-site.xml, поэтому все, что указывает, должно иметь достаточно дискового пространства.

...