Hadoop: Как сохранить счетчик для выполнения метода датоде? - PullRequest
0 голосов
/ 31 января 2019

Я хотел бы знать, как вести подсчет числа раз, когда метод выполнялся на конкретном датоделе в задании MapReduce.

К сожалению, с кодом, который я разработал до сих пор, я получаю очень неблагоприятные результаты.Переменная count, передаваемая с помощью метода makeRequest, ведет себя очень нерационально (и создает повторяющийся шаблон).Результаты можно посмотреть здесь: results

«Количество» должно увеличиваться только для любого заданного задания MapReduce.Мне кажется, что важно отметить, что основным входным файлом, который я использую, являются просто числа: 0 - 750 000 (по одному на строку).Желаемый конечный результат должен содержать 750 000 сообщений на сервере.

Просто чтобы немного рассказать об истории: в настоящее время я разрабатываю программу MapReduce для простых чисел, цель которой - периодически (на основе таймера) передавать информацию в терминах «числа» чисел, обрабатываемых на сервере.,Сервер размещен на главном узле и предназначен для отображения результатов работы в режиме реального времени.

Пожалуйста, извините, если я допустил какие-либо вопиющие ошибки в моем понимании Hadoop Framework, к сожалению, я все еще новичок в этом и все еще учусь.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.net.InetAddress;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.UnknownHostException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Timer;
import java.util.TimerTask; 

/** Hadoop MapReduce program to compute the prime numbers based on a given range provided within the input file. */

public final class Primes {

public static int counter = 0; 
public static boolean created = false;

/**
 * Defines Job Configuration
 */
public final static void main(final String[] args) throws Exception {
    final Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://master:9000");
    conf.set("mapreduce.jobtracker.address", "master:5431");
    conf.set("mapreduce.framework.name", "yarn");
    conf.set("yarn.resourcemanager.address", "master:8050");
    final Job job = new Job(conf, "Primes");
    job.setJarByClass(Primes.class);

    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(PrimesMap.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}

/**
 * Creates a timer which processes a GET request to the
 * hosted server, repeating every five seconds.
 */
public final static void createRequestTimer(){
    Timer timer = new Timer(); 
    TimerTask request = new TimerTask() {
        @Override
        public void run(){ makeRequest(counter);}
    };

    int delay = 3000; 
    int period = 5000;
    timer.scheduleAtFixedRate(request, delay, period); 
}

public static final class PrimesMap extends Mapper<LongWritable, Text, NullWritable, IntWritable> {
    final NullWritable nw = NullWritable.get(); 
    public final void map(final LongWritable key, final Text value, final Context context)
            throws IOException, InterruptedException {
        final int number = Integer.parseInt(value.toString());

        /**
         * Creates a timer the first time this method is executed. Ensures that only one
         * timer will be maintained.
         */
        if(created == false){
            createRequestTimer(); 
            created = true; 
        }

        /**
         * Checks to see if the number is in fact prime
         */
        if(isPrime(number)) {
            context.write(nw, new IntWritable(number));
        }

    }
}

/**
 * Basic primality test
 */
private static final boolean isPrime(final int number) {
    //Function should be performed on all numbers, and therefore
    //Can be incremented within this function (as the first step)
    counter++;
    if(number <= 1){
        return false; 
    }

    if(number == 2) {
        return true; 
    }

    if(number % 2 == 0){
        return false; 
    }

    for(int i = 3; i <= Math.sqrt(number) + 1; i = i + 2){
        if(number % i == 0){
            return false; 
        }
    }
    return true;
}

/**
 * Based on the counter parameter, a get request will be made to the
 * server. This method is effectively used to relay the number of numbers
 * that have been processed by this particular node to the server (which 
 * then goes on to display results in real time). 
 */
private static void makeRequest(int counter){
    String url = "http://192.168.1.2:5000/add/1/" + counter ;

    try {
        String IP = InetAddress.getLocalHost().toString();
        if(IP.contains("192.168.1.3")){
            url = "http://192.168.1.2:5000/add/1/" + counter; 
        }
        else if(IP.contains("192.168.1.4")){
            url = "http://192.168.1.2:5000/add/2/" + counter;
        }
        else if(IP.contains("192.168.1.5")){
            url = "http://192.168.1.2:5000/add/3/" + counter;
        }
        else if(IP.contains("192.168.1.6")){
            url = "http://192.168.1.2:5000/add/4/" + counter;
        }
        else if(IP.contains("192.168.1.7")){
            url = "http://192.168.1.2:5000/add/5/" + counter;
        }
        else if(IP.contains("192.168.1.8")){
            url = "http://192.168.1.2:5000/add/6/" + counter;
        }
        else if(IP.contains("192.168.1.9")){
            url = "http://192.168.1.2:5000/add/7/" + counter;
        }
        else if(IP.contains("192.168.1.10")){
            url = "http://192.168.1.2:5000/add/8/" + counter;
        }

        URL myurl = new URL(url);         
        HttpURLConnection con = (HttpURLConnection) myurl.openConnection();            
        con.setRequestMethod("GET");
        con.getInputStream(); 
        con.disconnect();
    } catch (Exception e){
        e.printStackTrace();
    }
}

}

Текущие результаты показывают, что представляемая переменная "count" увеличивается и уменьшается в значении (что неверно).

Желаемые результаты для задания таковы, что значение после «GET / add / 1 /» никогда не уменьшается и увеличивается только в течение задания MapReduce (столько раз, сколько isPrime ())метод называется).Если бы я мог получить некоторую помощь, это было бы очень ценно!:)

Еще раз повторюсь, я хотел бы знать: Как я могу подсчитать, сколько раз каждая датодана выполняет определенный метод в рамках Hadoop

Ошибка в предоставленном коде лежит исключительно в «counter» и в том, как она увеличивается с помощью функций «mapper» и «isPrime».Я не уверен, почему переменная 'counter' уменьшается на протяжении всей программы.

1 Ответ

0 голосов
/ 31 января 2019

Он имеет 3 потока.
, если вы хотите, чтобы интервал обновления составлял 5 секунд, это нормально.
, если эти изменения возможны, это то, что я рассмотрю.
на стороне агрегатора.сервер, который принимает / add / 1 ...., начинает его с 0 и добавляет отправленное значение.
рабочая сторона.пусть он сбрасывает значение каждый раз, когда отправляет

, просмотр инструкций журнала из ранней части цикла может показать, что происходит лучше.

...