Я получаю ниже ошибка при запуске программы.Я пытаюсь найти среднее значение Dstream в формате (name, avg).
'Метод combByKey (Function, Function2, Function2, Partitioner) в типе JavaPairDStream не применим дляАргументы (Функция, Функция2, Функция2) '
Пожалуйста, помогите.
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.log4j.*;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Map;
import java.util.Map.Entry;
import scala.Tuple2;
public class FirstSparkApplication {
@SuppressWarnings("serial")
public static class AvgCount implements java.io.Serializable {
public AvgCount(double total, int num) {
total_ = total;
num_ = num;
}
public double total_;
public int num_;
public double avg() {
return total_ / (double) num_;
}
}
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("FirstSparkApplication");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(20));
Logger.getRootLogger().setLevel(Level.ERROR);
Function2<Double, Double, Double> reduceFunc = new Function2<Double, Double,
Double>() {
public Double call(Double result, Double value)
throws Exception {
System.out.println("Reduce running");
System.out.println(result + "+" + value);
return result + value;
}
};
JavaDStream<String> lines = jssc.textFileStream("/home/dominic/Downloads/DATADIR").cache();
final String[] path = new String[]{ "/home/dominic/Downloads/OUTPUTDIR"};
JavaPairDStream<String, Double> pair = lines.flatMapToPair(
new PairFlatMapFunction<String, String, Double>() {
private static final long serialVersionUID = 67676744;
public Iterator<Tuple2<String, Double>> call(String t) throws Exception {
List<Tuple2<String, Double>> list = new ArrayList<Tuple2<String, Double>>();
JSONArray js1 = new JSONArray(t);
for (int i = 0; i < js1.length(); i++) {
String symbol = js1.getJSONObject(i).get("symbol")
.toString();
JSONObject jo = new JSONObject(js1.getJSONObject(i)
.get("priceData").toString());
list.add(new Tuple2<String, Double>(symbol,jo.getDouble("close")));
}
return list.iterator();
}
});
JavaPairDStream<String, Double> result=pair.reduceByKeyAndWindow(reduceFunc, Durations.seconds(100), Durations.seconds(60));
pair.print();
result.print();
//Average
Function<Double, AvgCount> createAcc = new Function<Double, AvgCount>() {
public AvgCount call(Double x) {
return new AvgCount(x, 1);
}
};
Function2<AvgCount, Double, AvgCount> addAndCount = new Function2<AvgCount, Double, AvgCount>() {
public AvgCount call(AvgCount a, Double x) {
a.total_ += x;
a.num_ += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
}
};
AvgCount initial = new AvgCount(0,0);
JavaPairDStream<String, AvgCount> avgCounts = result.combineByKey(createAcc, addAndCount, combine);
// Map<String, AvgCount> countMap = avgCounts.collectAsMap();
// for (Entry<String, AvgCount> entry : countMap.entrySet()) {
// System.out.println(entry.getKey() + ":" + entry.getValue().avg());
jssc.start();
jssc.awaitTermination();
jssc.close();
}
}