Apace Spark RDD - unpersist () и использование в качестве локальной переменной - PullRequest
0 голосов
/ 04 июля 2019
SubjectMarks -> Table
SubjectName String,
Marks Integer,
ModifieDate Date 

PK: SubjectName,
CK: Marks

English , 92, 2019-07-01 06:13:14+0000
English, 90 , 2019-07-02 10:15:09+0000
English, 82 , 2019-07-01 11:24:05+0000
Maths, 33, 2019-07-02 10:15:09+0000
Maths, 45, 2019-07-02 10:15:09+0000
Science, 66, 2019-07-02 10:15:09+0000
Social Science, 65, 2019-07-02 10:15:09+0000


/**

 This method reads the subject marks table, and returns the data, which are modified after a particularTime

 If we pass time as 2019-07-02 00:00:00, excluding the 1st and the 3rd records, returns a Map<String,HashSet<Integer>>

I.e) subjects and the list of marks.

**/

public Map<String,HashSet<Integer>> getSubjectMarksAfterParticularTime(List<String> subjects, Date modifiedTime, final ReadConf config) {
    JavaRDD<SubjectMarksPOJO> subjectMarksRDD = CassandraJavaUtil.javaFunctions(sparkContext)
            .cassandraTable(connection.getKeyspaceName(), “SubjectMarks”,
                    CassandraJavaUtil.mapRowTo(SubjectMarksPOJO.class))
            .withConnector(connection.getCassandraConnector()).withReadConf(config)
            .where(getSubjects(subjects));

    JavaRDD<SubjectMarksPOJO> filteredSubjects = filterSubjectChanges(SubjectMarksRDD, modifiedTs);
    subjectMarksRDD.unpersist();
    JavaPairRDD<String, Integer> subjectsRDD = filteredSubjects.mapToPair(subject -> {
        return new Tuple2(subject.getSubjectName(), subject.getMarks());

    });
//filteredSubjects.unpersist();
    Map<String, HashSet<Integer>> sets = subjectsRDD.aggregateByKey(new HashSet<String>(),
            (a, b) -> {
                a.add(b);
                return a;
            },
            (a, b) -> {
                a.addAll(b);
                return a;
            }).collectAsMap();

  //subjectsRDD.unpersist();

    sets.entrySet().stream().forEach(en->System.out.println(en.getKey() + " - " + en.getValue()));

   return sets;

}

private JavaRDD<SubjectMarksPOJO> filterSubjectChanges(JavaRDD<SubjectMarksPOJO> allSubjects,final Date modifiedDate) {
        if (allSubjects == null) {
            throw new IllegalArgumentException(
                    "No Subjects Found“);
        }
    return allSubjects.filter(subject -> (
            subject.getModifiedDate().after(modifiedDate)));

}

public String getSubjects(List<String> subjects) {
    return “subjects IN(‘English’,’Maths’,’Science’)”;

}
  1. Это хорошая практика для создания всех JavaRDD, таких как SubjectMarksRDD, FilterSubjects, subjectRDD в качестве переменной экземпляра вместо локальной переменной ??Есть ли какая-то разница с использованием локальной переменной?
  2. Нужно ли вызывать RDD.unpersist () [закомментированный код в программе] после использования промежуточного RDD?Требуется ли это?

Я прочитал документацию по использованию unpersist ().Он звучит так: «Если вы хотите вручную удалить RDD вместо ожидания его падения из кэша, используйте метод RDD.unpersist ()»?

...