Я хотел бы знать, как вести подсчет числа раз, когда метод выполнялся на конкретном датоделе в задании MapReduce.
К сожалению, с кодом, который я разработал до сих пор, я получаю очень неблагоприятные результаты.Переменная count, передаваемая с помощью метода makeRequest, ведет себя очень нерационально (и создает повторяющийся шаблон).Результаты можно посмотреть здесь:
«Количество» должно увеличиваться только для любого заданного задания MapReduce.Мне кажется, что важно отметить, что основным входным файлом, который я использую, являются просто числа: 0 - 750 000 (по одному на строку).Желаемый конечный результат должен содержать 750 000 сообщений на сервере.
Просто чтобы немного рассказать об истории: в настоящее время я разрабатываю программу MapReduce для простых чисел, цель которой - периодически (на основе таймера) передавать информацию в терминах «числа» чисел, обрабатываемых на сервере.,Сервер размещен на главном узле и предназначен для отображения результатов работы в режиме реального времени.
Пожалуйста, извините, если я допустил какие-либо вопиющие ошибки в моем понимании Hadoop Framework, к сожалению, я все еще новичок в этом и все еще учусь.
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.net.InetAddress;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.UnknownHostException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Timer;
import java.util.TimerTask;
/** Hadoop MapReduce program to compute the prime numbers based on a given range provided within the input file. */
public final class Primes {
public static int counter = 0;
public static boolean created = false;
/**
* Defines Job Configuration
*/
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9000");
conf.set("mapreduce.jobtracker.address", "master:5431");
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.address", "master:8050");
final Job job = new Job(conf, "Primes");
job.setJarByClass(Primes.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(PrimesMap.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
/**
* Creates a timer which processes a GET request to the
* hosted server, repeating every five seconds.
*/
public final static void createRequestTimer(){
Timer timer = new Timer();
TimerTask request = new TimerTask() {
@Override
public void run(){ makeRequest(counter);}
};
int delay = 3000;
int period = 5000;
timer.scheduleAtFixedRate(request, delay, period);
}
public static final class PrimesMap extends Mapper<LongWritable, Text, NullWritable, IntWritable> {
final NullWritable nw = NullWritable.get();
public final void map(final LongWritable key, final Text value, final Context context)
throws IOException, InterruptedException {
final int number = Integer.parseInt(value.toString());
/**
* Creates a timer the first time this method is executed. Ensures that only one
* timer will be maintained.
*/
if(created == false){
createRequestTimer();
created = true;
}
/**
* Checks to see if the number is in fact prime
*/
if(isPrime(number)) {
context.write(nw, new IntWritable(number));
}
}
}
/**
* Basic primality test
*/
private static final boolean isPrime(final int number) {
//Function should be performed on all numbers, and therefore
//Can be incremented within this function (as the first step)
counter++;
if(number <= 1){
return false;
}
if(number == 2) {
return true;
}
if(number % 2 == 0){
return false;
}
for(int i = 3; i <= Math.sqrt(number) + 1; i = i + 2){
if(number % i == 0){
return false;
}
}
return true;
}
/**
* Based on the counter parameter, a get request will be made to the
* server. This method is effectively used to relay the number of numbers
* that have been processed by this particular node to the server (which
* then goes on to display results in real time).
*/
private static void makeRequest(int counter){
String url = "http://192.168.1.2:5000/add/1/" + counter ;
try {
String IP = InetAddress.getLocalHost().toString();
if(IP.contains("192.168.1.3")){
url = "http://192.168.1.2:5000/add/1/" + counter;
}
else if(IP.contains("192.168.1.4")){
url = "http://192.168.1.2:5000/add/2/" + counter;
}
else if(IP.contains("192.168.1.5")){
url = "http://192.168.1.2:5000/add/3/" + counter;
}
else if(IP.contains("192.168.1.6")){
url = "http://192.168.1.2:5000/add/4/" + counter;
}
else if(IP.contains("192.168.1.7")){
url = "http://192.168.1.2:5000/add/5/" + counter;
}
else if(IP.contains("192.168.1.8")){
url = "http://192.168.1.2:5000/add/6/" + counter;
}
else if(IP.contains("192.168.1.9")){
url = "http://192.168.1.2:5000/add/7/" + counter;
}
else if(IP.contains("192.168.1.10")){
url = "http://192.168.1.2:5000/add/8/" + counter;
}
URL myurl = new URL(url);
HttpURLConnection con = (HttpURLConnection) myurl.openConnection();
con.setRequestMethod("GET");
con.getInputStream();
con.disconnect();
} catch (Exception e){
e.printStackTrace();
}
}
}
Текущие результаты показывают, что представляемая переменная "count" увеличивается и уменьшается в значении (что неверно).
Желаемые результаты для задания таковы, что значение после «GET / add / 1 /» никогда не уменьшается и увеличивается только в течение задания MapReduce (столько раз, сколько isPrime ())метод называется).Если бы я мог получить некоторую помощь, это было бы очень ценно!:)
Еще раз повторюсь, я хотел бы знать: Как я могу подсчитать, сколько раз каждая датодана выполняет определенный метод в рамках Hadoop
Ошибка в предоставленном коде лежит исключительно в «counter» и в том, как она увеличивается с помощью функций «mapper» и «isPrime».Я не уверен, почему переменная 'counter' уменьшается на протяжении всей программы.