Функция MapReduce для поиска пересечения множеств - PullRequest
0 голосов
/ 23 мая 2018

Что я хочу:

для разработки функции MapReduce, которая имеет

Ввод:

key1 \ t A1 \ t B1

key2 \ t A2 \ t B2

key3 \ t A3 \ t B3

....

key30 \ t A30 \ t B30

Желаемый выход:

"мин" \ t пересекаются (A1, A2, A3 ...)

"макс" \ t пересекаются (B1, B2, B3, ...)

где A1, A2, A3, .. B1, B2, B3 - наборы наборов.

Что я сделал :

Я проектирую Mapper иРедуктор, как показано ниже.

Mapper:

 public static class MapAPP extends Mapper<Text, Text, Text, Text>{     

    public static int j=0,k=0;
    public static List<String> min_pre = new ArrayList<>();
    public static List<String> min_current = new ArrayList<>();
    public static Set<String> min_p1 = new HashSet<>();
    public static Set<String> min_c1 = new HashSet<>();
    public static List<String> min_result = new ArrayList<>(); 
    public static Boolean no_exist_min=false;

    public static List<String> max_pre = new ArrayList<>();
    public static List<String> max_current = new ArrayList<>();
    public static Set<String> max_p1 = new HashSet<>();
    public static Set<String> max_c1 = new HashSet<>();
    public static List<String> max_result = new ArrayList<>(); 
    public static Boolean no_exist_max=false;

    public void map(Text key, Text value, Context con) throws IOException, InterruptedException
    {
        String[] v=value.toString().split("\t");
        // aggregate min
        if (no_exist_min==false){
            if (j==0){
                    min_pre= Arrays.asList(v[0].toString().trim().split("\\|"));
                    j=1;
                 }else{
                    min_current= Arrays.asList(v[0].toString().trim().split("\\|")); 
                    for (String p: min_pre){                   
                       min_p1 = new HashSet<String>(Arrays.asList(p.split(",")));
                       for (String c: min_current){
                           min_c1 = new HashSet<String>(Arrays.asList(c.split(",")));
                           min_c1.retainAll(min_p1);
                           if (!min_c1.isEmpty()){
                               Joiner m_comma = Joiner.on(",").skipNulls();
                               String buff = m_comma.join(min_c1);
                               if (!min_result.contains(buff))
                                    min_result.add(buff);
                           }                       
                       }                   
                    }
                    if (min_result.isEmpty()){
                        no_exist_min=true;          
                    } else {                    
                        min_pre=new ArrayList(min_result);
                        min_result.clear();                       
                    }
            }                   
        }

        //aggregate max
        if (no_exist_max==false){
            if (k==0){
                    max_pre= Arrays.asList(v[1].toString().trim().split("\\|"));
                    k=1;
                 }else{
                    max_current= Arrays.asList(v[1].toString().trim().split("\\|")); 
                    for (String p: max_pre){                   
                       max_p1 = new HashSet<String>(Arrays.asList(p.split(",")));
                       for (String c: max_current){
                           max_c1 = new HashSet<String>(Arrays.asList(c.split(",")));
                           max_c1.retainAll(max_p1);
                           if (!max_c1.isEmpty()){
                               Joiner m_comma = Joiner.on(",").skipNulls();
                               String buff = m_comma.join(max_c1);
                               if (!max_result.contains(buff))
                                    max_result.add(buff);
                           }                       
                       }                   
                    }
                    if (max_result.isEmpty()){
                        no_exist_max=true;          
                    } else {                    
                        max_pre=new ArrayList(max_result);
                        max_result.clear();                       
                    }
            }                   
        }

    }

    protected void cleanup(Context con) throws IOException, InterruptedException {
        Joiner m_pipe = Joiner.on("|").skipNulls();
        if (no_exist_min==true){
            con.write(new Text("min"), new Text("no_exist"));
        }else {               
            String min_str = m_pipe.join(min_pre);
            con.write(new Text("min"), new Text(min_str)); 

        }

        if (no_exist_max==true){
            con.write(new Text("max"), new Text("no_exist"));
        }else {
            String max_str = m_pipe.join(max_pre);                
            con.write(new Text("max"), new Text(max_str));                
        }            
        min_p1.clear();
        min_c1.clear();
        min_result.clear();

        max_p1.clear();
        max_c1.clear();
        max_result.clear();
    }
}

Редуктор:

public static class ReduceAPP extends Reducer<Text, Text, Text, Text>
{
    public void reduce(Text key, Iterable<Text> values, Context con) throws IOException, InterruptedException
    {
        List<String> pre = new ArrayList<>();
        List<String> current = new ArrayList<>();
        Set<String> p1 = new HashSet<>();
        Set<String> c1 = new HashSet<>();
        List<String> result = new ArrayList<>();
        Joiner comma = Joiner.on(",").skipNulls(); 
        Joiner pipe = Joiner.on("|").skipNulls(); 
        Boolean no_exist=false;
        String preStr="";
        int i=0;
        // aggregate
        for(Text value: values){
             if (value.toString().trim()=="no_exist"){
                 no_exist=true;
                 break;
                }
             if (i==0){
                    pre= Arrays.asList(value.toString().trim().split("\\|"));
                    i=1;
             }else{
                    current= Arrays.asList(value.toString().trim().split("\\|")); 
                    for (String p: pre){                   
                       p1 = new HashSet<String>(Arrays.asList(p.split(",")));
                       for (String c: current){
                           c1 = new HashSet<String>(Arrays.asList(c.split(",")));
                           c1.retainAll(p1);
                           if (!c1.isEmpty()){
                               String buff = comma.join(c1);
                               if (!result.contains(buff))
                                    result.add(buff);
                           }                       
                       }                   
                    }
                    if (result.isEmpty()){
                        no_exist=true;
                        break;
                    }
                    pre=new ArrayList(result);
                    result.clear();                       
             }                   

        }
        if (no_exist==true){
            con.write(key, new Text("no_exist"));
        }
        else{
            preStr = pipe.join(pre);
            con.write(key, new Text(preStr)); 
        }
        System.out.println("Reducefinished: key="+key.toString()+", value= "+preStr);
    }
    public static <T> Set<T> union(Set<T> setA, Set<T> setB) {
        Set<T> tmp = new TreeSet<T>(setA);
        tmp.addAll(setB);
        return tmp;
    }
}

Приведенные ниже коды выглядят сложными, но идея очень проста.Чтобы вычислить пересечение (A1, A2, A3), я сначала вычисляю A12 = пересечение (A1, A2), затем вычисляю A13 = пересечение (A12, A3) и так далее до конца.Я делаю то же самое, чтобы пересечь (B1, B2, B3).На самом деле мой код Mapper и Reduce почти одинаков.Разница в том, где я выводил: в функции очистки каждого картографа, а в функции уменьшения моего редуктора.

Для вас легко представить:

Вывод моего маппера:

Mapper1:

min \ t A16 = пересечение (A1, A2, .. A6)

max \ t B16 = пересечение (B1, B2, .. B6)

Mapper2:

min \ t A715 = пересечение (A7, A8, .. A15)

max \ t B715 = пересечение (B7, B8, .. B15)

Mapper3:

min \ t A1622 = пересечение (A16, A17, .. A22)

max \ t B1622 = пересечение (B16, B17, .. B22)

Mapper4:

min \ t A2330 = пересечение (A23, A24, .. A30)

max \ t B2330 = пересечение (B23, B24, .. B30)

Мой выход редуктора:

Редуктор1: мин \ t A130 = пересечение (A16, A715, A1622, A2330)

Редуктор2: max \ t B130 = пересечение (B16, B715, B1622, B2330)

Задача

Редуктор1 рассчитандовольно легко из-за меньшего количества наборов в A. Но из-за размера B >> размера A, поэтому Reducer2 занимает много времени, и иногда я получаю ошибку переполнения кучи.Что я могу сделать, чтобы ускорить процесс Reducer2?

Большое спасибо.

...