фильтр KeyValueGrouped Dataset в искре - PullRequest
0 голосов
/ 01 октября 2018

У меня есть типизированный набор данных пользовательского класса и я использую метод groupbykey.Вы знаете, что это приводит к KeyValueGroupedDataset.Я хочу отфильтровать этот новый набор данных, но не существует метода фильтрации для этого типа набора данных.Итак, мой вопрос: как я могу фильтровать данные этого типа?(Требуется решение Java. Версия spark: 2.3.1).

sampleData:

"id":1,"fname":"Gale","lname":"Willmett","email":"gwillmett0@nhs.uk","gender":"Female"
"id":2,"fname":"Chantalle","lname":"Wilcher","email":"cwilcher1@blinklist.com","gender":"Female"
"id":3,"fname":"Polly","lname":"Grandisson","email":"pgrandisson2@linkedin.com","gender":"Female"
"id":3,"fname":"Moshe","lname":"Pink","email":"mpink3@twitter.com","gender":"Male"
"id":2,"fname":"Yorke","lname":"Ginnelly","email":"yginnelly4@apple.com","gender":"Male"

И что я сделал:

    Dataset<Person> peopleDS = spark.read().format("parquet").load("\path").as(Encoders.bean(Person.class));
    KeyValueGroupedDataset<String, Person> KVDS = peopleDS.groupByKey( (MapFunction<Person, String> ) f -> f.getGender() , Encoders.STRING());
//How Can I filter on KVDS's id field? 

Update1 (использование flatMapGroups):

Dataset<Person> persons = KVDS.flatMapGroups((FlatMapGroupsFunction <String,Person,Person>) (f,k) -> (Iterator<Person>) k ,  Encoders.bean(Person.class));

Update2 (использование MapGroups)

Dataset<Person> peopleMap = KVDS.mapGroups((MapGroupsFunction <String,Person,Person>) (f,g) -> {
        while (g.hasNext()) {
        //What can I do here?       
    }
},Encoders.bean(Person.Class); 

Update3 : я хочу отфильтровать те группы, которые отличаются от их идентификаторов больше 1. Напримерна рисунке ниже: я хочу только женские группы, потому что их идентификаторы больше 1 (первое поле - id. Остальные - fname, lname, email и пол).enter image description here

Update4: Я сделал то, что я хочу, с помощью «СДР», но я хочу сделать именно эту часть кода с помощью «набора данных»:

List<Tuple2<String, Iterable<Person>>> f = PersonRDD
        .mapToPair(s -> new Tuple2<>(s.getGender(), s)).groupByKey()
        .filter(t -> ((Collection<Person>) t._2()).stream().mapToInt(e -> e.getId).distinct().count() > 1)
        .collect();

Ответы [ 2 ]

0 голосов
/ 01 октября 2018

Почему вы не фильтруете по идентификатору перед группировкой?GroupByKey - это дорогостоящее действие, оно должно быть быстрее для фильтрации вначале.

Если вы действительно хотите сначала группировать, вам, возможно, придется использовать .flatMapGroups с функцией идентификации.

Не уверен насчет javaкод, но версия Scala будет выглядеть следующим образом:

peopleDS
.groupByKey(_.gender)
.mapGroups { case (gender, persons) => persons.filter(your condition) }

Но опять же, вы должны сначала фильтровать :).Тем более что ваше поле идентификатора уже доступно до группировки.

0 голосов
/ 01 октября 2018

Группировка используется для функций агрегирования, вы можете найти такие функции, как «agg» в классе «KeyValueGroupedDataset».Если вы применяете функцию агрегирования для напр."count", вы получите "Dataset", и будет доступна функция "filter".

"groupBy" без функции агрегирования выглядит странно, другая функция, например.Можно использовать «отдельный».

Пример фильтрации с помощью функции FlatMapGroupsFunction:

                .flatMapGroups(
                    (FlatMapGroupsFunction<String, Person, Person>) (f, k) -> {
                        List<Person> result = new ArrayList<>();
                        while (k.hasNext()) {
                            Person value = k.next();
                            // filter condition here
                            if (value != null) {
                                result.add(value);
                            }
                        }
                        return result.iterator();
                    },
                    Encoders.bean(Person.class))
...