Я пытаюсь создать искровое приложение, которое получает набор данных lat , long , timestamp points и увеличивает количество ячеек, если они находятся внутри ячейка сетки. Сетка состоит из 3d ячеек с lon , lat и time в качестве оси z.
Теперь я заполнил приложение, и оно делает то, что должно, но для сканирования всего набора данных требуются часы (~ 9g). Мой кластер состоит из 3 узлов с 4 ядрами по 8 ГБ каждый, и в настоящее время я использую 6 исполнителей с 1 ядром и 2G каждый.
Я предполагаю, что могу немного оптимизировать код, но есть ли большая ошибка в моем коде, которая приводит к этой задержке?
//Create a JavaPairRDD with tuple elements. For each String line of lines we split the string
//and assign latitude, longitude and timestamp of each line to sdx,sdy and sdt. Then we check if the data point of
//that line is contained in a cell of the centroids list. If it is then a new tuple is returned
//with key the latitude, Longitude and timestamp (split by ",") of that cell and value 1.
JavaPairRDD<String, Integer> pairs = lines.mapToPair(x -> {
String sdx = x.split(" ")[2];
String sdy = x.split(" ")[3];
String sdt = x.split(" ")[0];
double dx = Double.parseDouble(sdx);
double dy = Double.parseDouble(sdy);
int dt = Integer.parseInt(sdt);
List<Integer> t = brTime.getValue();
List<Point2D.Double> p = brCoo.getValue();
double dist = brDist.getValue();
int dur = brDuration.getValue();
for(int timeCounter=0; timeCounter<t.size(); timeCounter++) {
for ( int cooCounter=0; cooCounter < p.size(); cooCounter++) {
double cx = p.get(cooCounter).getX();
double cy = p.get(cooCounter).getY();
int ct = t.get(timeCounter);
String scx = Double.toString(cx);
String scy = Double.toString(cy);
String sct = Integer.toString(ct);
if (dx > (cx-dist) && dx <= (cx+dist)) {
if (dy > (cy-dist) && dy <= (cy+dist)) {
if (dt > (ct-dur) && dt <= (ct+dur)) {
return new Tuple2<String, Integer>(scx+","+scy+","+sct,1);
}
}
}
}
}
return new Tuple2<String, Integer>("Out Of Bounds",1);
});