Hazelcast Jet - вариант использования по группам - PullRequest
0 голосов
/ 30 июня 2019

У нас есть требование динамически группировать по нескольким полям на огромном наборе данных. Данные хранятся в кластере Hazelcast Jet. Пример: если класс Person содержит 4 поля: age, name, city и country. Сначала нужно сгруппировать по городам, а затем по странам, а затем мы можем сгруппировать по именам на основе условных параметров.

Мы уже пытались использовать распределенную коллекцию и не работают. Даже когда мы пытались использовать Pipeline API, он выдает ошибку.

Код:

    IMap res= client.getMap("res"); // res is distrbuted map
    Pipeline p = Pipeline.create();
    JobConfig jobConfig = new JobConfig();
    p.drawFrom(Sources.<Person>list("inputList"))
     .aggregate(AggregateOperations.groupingBy(Person::getCountry))
     .drainTo(Sinks.map(res));      
    jobConfig = new JobConfig();
    jobConfig.addClass(Person.class);
    jobConfig.addClass(HzJetListClientPersonMultipleGroupBy.class);
    Job job = client.newJob(p, jobConfig);
    job.join();

Затем мы читаем с карты в клиенте и уничтожаем ее.

Сообщение об ошибке на сервере:

Причина: java.lang.ClassCastException: java.util.HashMap нельзя преобразовать в java.util.Map $ Entry

1 Ответ

2 голосов
/ 01 июля 2019

groupingBy агрегирует все элементы ввода в HashMap, где ключ извлекается с использованием данной функции.В вашем случае он объединяет поток Person элементов в один HashMap<String, List<Person>> элемент.

Вам необходимо использовать это:

        p.drawFrom(Sources.<Person>list("inputList"))
         .groupingKey(Person::getCountry)
         .aggregate(AggregateOperations.toList())
         .drainTo(Sinks.map(res));

Это заполнит карту resсписок людей в каждом городе.

Помните, что без groupingKey() агрегация всегда глобальна.То есть все элементы ввода будут объединены в один элемент вывода.

...