JavaPairRDD<Integer,Iterable<Long>> tripByKey= getData.mapToPair(line->
{
String[] parts=SPACES.split(line);
return new Tuple2<>(Integer.parseInt(parts[0]),sdf.parse(parts[4]).getTime());
}).groupByKey();
После преобразования groupByKey у нас будет пользователь (ключ), все даты для этого пользователя (значение как итеративное).
Теперь, чтобы получить количество поездок на пользователя, нам нужно будет отсортировать эти даты и использовать нашу логику для получения поездок на пользователя.
JavaPairRDD<Integer, Integer> tripsCountPerUser = tripByKey.mapValues(func);
Function<Iterable<Long>, Integer> fun = (Iterable<Long> itr ) -> {
List<Long> dates = new ArrayList<>();
for (Long i:itr) {
dates.add(i);
}
Collections.sort(dates);
long day = 86400000l ;
long days7 = day * 7;
int count = 0;
Long firstDay = null;
for (Long dt : dates) {
if(firstDay == null)
{
firstDay = dt;
count = 1;
}
else {
Long diffMs = dt - firstDay ;
if(diffMs > days7 ) {
firstDay = dt;
count ++;
}
}
}
return count;
};
Приведенный выше код не является оптимизированным кодом и может быть выполнен несколькими различными способами.
Приведенный выше код предназначен для решения проблемы.
Надеюсь, это поможет.
Предположение: -
2018-01-01, 2018-01-08, 2018-01-09, 2018-01-11 рассматривают две поездки, которые
- 2018-01-01, 2018-01-08 (в течение 7 дней)
- 2018-01-09, 2018-01-11 (7 дней)