Я хочу построить Mapreduce, который имеет
Ввод :
key1 \ т 3,2 | 17412 | 553 | 15186,19199 | 15186,3947 | 15186,5938 | 15186,15517
key2 \ t925 | 10295 | 65182555,7344 | 7344,925 | 10295,7344 | 3,2 key3 \ t8747 | 18466 | 13289 | 3,2 | 13289,5106 | 12222 , 5106 | 5106,6374
........
Выход : min \ t (2,3), который является пересечением между каждым элементом value1, каждым элементом value2, .... и valueN.
Итак, я спроектировал свои мапперы так, чтобы
mapper1 будет содержать пересечения между значениями key1, key2, key3,
mapper2 будет содержать пересечения между значениями key4, key5, key6 ...
.......
Затем мои редукторы снова получают результаты от этих картографов, чтобы найти окончательные пересечения. Итак, в основном мой маппер и редуктор используют один и тот же код. В моем коде я нахожу пересечение последовательно, то есть сначала находим пересечение между значением1 и значением2, затем результат будет использоваться для пересечения со значением3 и так далее.
Мой картограф.
Mapper-Code1:
public static class MapAPP extends Mapper<Text, Text, Text, Text>{
public static int j=0,k=0;
public static List<String> min_pre = new ArrayList<>();
public static List<String> min_current = new ArrayList<>();
public static Set<String> min_p1 = new HashSet<>();
public static Set<String> min_c1 = new HashSet<>();
public static List<String> min_result = new ArrayList<>();
public static Boolean no_exist_min=false;
public void map(Text key, Text value, Context con) throws IOException, InterruptedException
{
String[] v=value.toString().split("\t");
// aggregate min
if (no_exist_min==false){
if (j==0){
min_pre= Arrays.asList(v[1].toString().trim().split("\\|"));
j=1;
}else{
min_current= Arrays.asList(v[1].toString().trim().split("\\|"));
for (String p: min_pre){
min_p1 = new HashSet<String>(Arrays.asList(p.split(",")));
for (String c: min_current){
min_c1 = new HashSet<String>(Arrays.asList(c.split(",")));
min_c1.retainAll(min_p1);
if (!min_c1.isEmpty()){
Joiner m_comma = Joiner.on(",").skipNulls();
String buff = m_comma.join(min_c1);
if (!min_result.contains(buff))
min_result.add(buff);
}
}
}
if (min_result.isEmpty()){
no_exist_min=true;
} else {
min_pre=new ArrayList(min_result);
min_result.clear();
}
}
}
}
protected void cleanup(Context con) throws IOException, InterruptedException {
Joiner m_pipe = Joiner.on("|").skipNulls();
if (no_exist_min==true){
con.write(new Text("min"), new Text("no_exist"));
}else {
String min_str = m_pipe.join(min_pre);
con.write(new Text("min"), new Text(min_str));
}
}
}
Мой редуктор (почти совпадает с Mapper):
public static class ReduceAPP extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context con) throws IOException, InterruptedException
{
List<String> pre = new ArrayList<>();
List<String> current = new ArrayList<>();
Set<String> p1 = new HashSet<>();
Set<String> c1 = new HashSet<>();
List<String> result = new ArrayList<>();
Joiner comma = Joiner.on(",").skipNulls();
Joiner pipe = Joiner.on("|").skipNulls();
Boolean no_exist=false;
int i=0;
// aggregate
for(Text value: values){
if (value.toString().trim()=="no_exist"){
no_exist=true;
break;
}
if (i==0){
pre= Arrays.asList(value.toString().trim().split("\\|"));
i=1;
}else{
current= Arrays.asList(value.toString().trim().split("\\|"));
for (String p: pre){
p1 = new HashSet<String>(Arrays.asList(p.split(",")));
for (String c: current){
c1 = new HashSet<String>(Arrays.asList(c.split(",")));
c1.retainAll(p1);
if (!c1.isEmpty()){
String buff = comma.join(c1);
if (!result.contains(buff))
result.add(buff);
}
}
}
if (result.isEmpty()){
no_exist=true;
break;
}
pre=new ArrayList(result);
result.clear();
}
}
if (no_exist==true){
con.write(key, new Text("no_exist"));
}
else{
String preStr = pipe.join(pre);
con.write(key, new Text(preStr));
}
}
public static <T> Set<T> union(Set<T> setA, Set<T> setB) {
Set<T> tmp = new TreeSet<T>(setA);
tmp.addAll(setB);
return tmp;
}
}
Я отлично запускаю небольшие входные файлы, но всегда не хватает памяти в больших файлах (~ 450 МБ текстовый файл) Итак, я сомневаюсь, что мой код Java не эффективность памяти. В моих Редукторах я использовал все локальные переменные, и эти переменные будут уничтожены, когда эти функции Редуктора завершатся, поэтому я не беспокоюсь о Редукторах. Но в моем Mapper я должен использовать статические переменные. В своем Mapper-code1 я использовал все статические переменные, тогда как в своем Mapper-code2 я пытался использовать как можно меньше статических переменных.
У меня есть два вопроса?
1) В моем Mapper-code1 каждая статическая переменная распределяется между мапперами или она предназначена исключительно для 1 маппера? Например, предположим, у меня есть 5 мапперов, будет ли создан список 1 min_pre и будет предоставлен общий доступ между 5 мапперами или будет 5 списков min_pre для 5 мапперов?
Что я хочу, так это последнее. Как спроектировать мой маппер так, чтобы, если у меня было 5 мапперов, было 5 списков min_pre?
2) Mapper-code1 и Mapper-code2, которые занимают меньше памяти?
Mapper-Кодекса2:
public static class MapAPP extends Mapper<Text, Text, Text, Text>{
public static int j=0,k=0;
public static List<String> min_pre = new ArrayList<>();
public static List<String> min_result = new ArrayList<>();
public static Boolean no_exist_min=false;
public void map(Text key, Text value, Context con) throws IOException, InterruptedException
{
String[] v=value.toString().split("\t");
// aggregate min
if (no_exist_min==false){
if (j==0){
min_pre= Arrays.asList(v[1].toString().trim().split("\\|"));
j=1;
}else{
List<String> min_current= Arrays.asList(v[1].toString().trim().split("\\|"));
for (String p: min_pre){
Set<String> min_p1 = new HashSet<String>(Arrays.asList(p.split(",")));
for (String c: min_current){
Set<String> min_c1 = new HashSet<String>(Arrays.asList(c.split(",")));
min_c1.retainAll(min_p1);
if (!min_c1.isEmpty()){
Joiner m_comma = Joiner.on(",").skipNulls();
String buff = m_comma.join(min_c1);
if (!min_result.contains(buff))
min_result.add(buff);
}
}
}
if (min_result.isEmpty()){
no_exist_min=true;
} else {
min_pre=new ArrayList(min_result);
min_result.clear();
}
}
}
}
protected void cleanup(Context con) throws IOException, InterruptedException {
Joiner m_pipe = Joiner.on("|").skipNulls();
if (no_exist_min==true){
con.write(new Text("min"), new Text("no_exist"));
}else {
String min_str = m_pipe.join(min_pre);
con.write(new Text("min"), new Text(min_str));
}
}
}