У меня есть типизированный набор данных пользовательского класса и я использую метод groupbykey.Вы знаете, что это приводит к KeyValueGroupedDataset.Я хочу отфильтровать этот новый набор данных, но не существует метода фильтрации для этого типа набора данных.Итак, мой вопрос: как я могу фильтровать данные этого типа?(Требуется решение Java. Версия spark: 2.3.1).
sampleData:
"id":1,"fname":"Gale","lname":"Willmett","email":"gwillmett0@nhs.uk","gender":"Female"
"id":2,"fname":"Chantalle","lname":"Wilcher","email":"cwilcher1@blinklist.com","gender":"Female"
"id":3,"fname":"Polly","lname":"Grandisson","email":"pgrandisson2@linkedin.com","gender":"Female"
"id":3,"fname":"Moshe","lname":"Pink","email":"mpink3@twitter.com","gender":"Male"
"id":2,"fname":"Yorke","lname":"Ginnelly","email":"yginnelly4@apple.com","gender":"Male"
И что я сделал:
Dataset<Person> peopleDS = spark.read().format("parquet").load("\path").as(Encoders.bean(Person.class));
KeyValueGroupedDataset<String, Person> KVDS = peopleDS.groupByKey( (MapFunction<Person, String> ) f -> f.getGender() , Encoders.STRING());
//How Can I filter on KVDS's id field?
Update1 (использование flatMapGroups):
Dataset<Person> persons = KVDS.flatMapGroups((FlatMapGroupsFunction <String,Person,Person>) (f,k) -> (Iterator<Person>) k , Encoders.bean(Person.class));
Update2 (использование MapGroups)
Dataset<Person> peopleMap = KVDS.mapGroups((MapGroupsFunction <String,Person,Person>) (f,g) -> {
while (g.hasNext()) {
//What can I do here?
}
},Encoders.bean(Person.Class);
Update3 : я хочу отфильтровать те группы, которые отличаются от их идентификаторов больше 1. Напримерна рисунке ниже: я хочу только женские группы, потому что их идентификаторы больше 1 (первое поле - id. Остальные - fname, lname, email и пол).
Update4: Я сделал то, что я хочу, с помощью «СДР», но я хочу сделать именно эту часть кода с помощью «набора данных»:
List<Tuple2<String, Iterable<Person>>> f = PersonRDD
.mapToPair(s -> new Tuple2<>(s.getGender(), s)).groupByKey()
.filter(t -> ((Collection<Person>) t._2()).stream().mapToInt(e -> e.getId).distinct().count() > 1)
.collect();