SubjectMarks -> Table
SubjectName String,
Marks Integer,
ModifieDate Date
PK: SubjectName,
CK: Marks
English , 92, 2019-07-01 06:13:14+0000
English, 90 , 2019-07-02 10:15:09+0000
English, 82 , 2019-07-01 11:24:05+0000
Maths, 33, 2019-07-02 10:15:09+0000
Maths, 45, 2019-07-02 10:15:09+0000
Science, 66, 2019-07-02 10:15:09+0000
Social Science, 65, 2019-07-02 10:15:09+0000
/**
This method reads the subject marks table, and returns the data, which are modified after a particularTime
If we pass time as 2019-07-02 00:00:00, excluding the 1st and the 3rd records, returns a Map<String,HashSet<Integer>>
I.e) subjects and the list of marks.
**/
public Map<String,HashSet<Integer>> getSubjectMarksAfterParticularTime(List<String> subjects, Date modifiedTime, final ReadConf config) {
JavaRDD<SubjectMarksPOJO> subjectMarksRDD = CassandraJavaUtil.javaFunctions(sparkContext)
.cassandraTable(connection.getKeyspaceName(), “SubjectMarks”,
CassandraJavaUtil.mapRowTo(SubjectMarksPOJO.class))
.withConnector(connection.getCassandraConnector()).withReadConf(config)
.where(getSubjects(subjects));
JavaRDD<SubjectMarksPOJO> filteredSubjects = filterSubjectChanges(SubjectMarksRDD, modifiedTs);
subjectMarksRDD.unpersist();
JavaPairRDD<String, Integer> subjectsRDD = filteredSubjects.mapToPair(subject -> {
return new Tuple2(subject.getSubjectName(), subject.getMarks());
});
//filteredSubjects.unpersist();
Map<String, HashSet<Integer>> sets = subjectsRDD.aggregateByKey(new HashSet<String>(),
(a, b) -> {
a.add(b);
return a;
},
(a, b) -> {
a.addAll(b);
return a;
}).collectAsMap();
//subjectsRDD.unpersist();
sets.entrySet().stream().forEach(en->System.out.println(en.getKey() + " - " + en.getValue()));
return sets;
}
private JavaRDD<SubjectMarksPOJO> filterSubjectChanges(JavaRDD<SubjectMarksPOJO> allSubjects,final Date modifiedDate) {
if (allSubjects == null) {
throw new IllegalArgumentException(
"No Subjects Found“);
}
return allSubjects.filter(subject -> (
subject.getModifiedDate().after(modifiedDate)));
}
public String getSubjects(List<String> subjects) {
return “subjects IN(‘English’,’Maths’,’Science’)”;
}
- Это хорошая практика для создания всех JavaRDD, таких как SubjectMarksRDD, FilterSubjects, subjectRDD в качестве переменной экземпляра вместо локальной переменной ??Есть ли какая-то разница с использованием локальной переменной?
- Нужно ли вызывать RDD.unpersist () [закомментированный код в программе] после использования промежуточного RDD?Требуется ли это?
Я прочитал документацию по использованию unpersist ().Он звучит так: «Если вы хотите вручную удалить RDD вместо ожидания его падения из кэша, используйте метод RDD.unpersist ()»?