Java Stream Group по строке, когда он извлекается из базы данных - PullRequest
0 голосов
/ 18 мая 2018

Допустим, у меня есть этот кусок кода.Насколько я знаю, приведенный ниже код работает следующим образом: если у меня есть 10 запросов и я выполняю их одновременно, а каждый запрос возвращает 10 миллионов результатов, мне нужно подождать 100 миллионов строк, извлеченных из базы данных, чтобы запустить функцию группы.

Моя проблема, так как количество декартовых произведений Country и City мало, а количество строк, которые я должен получить из базы данных, огромно.Я хочу сразу вычислить результат группы, когда строка извлечена из базы данных.Как я могу сделать это с помощью Java Stream?

  myqueries
 .parallelstream()
 .map( m-> { 
    //queryresult is a stream which return database rows
    return queryresult;
 })
 .flatMap(fm-> fm)
 .collect(Collectors.groupingBy(g-> {
                    List<Object> objects = Arrays.<Object>asList(
                    g.getCountry(),
                    g.getCity());
                    return objects;
                }, Collectors.toList()))


                .entrySet().stream().map(m-> {
                    MyResultClass item = new MyResultClass();
                    item.setCountry((String) m.getKey().get(0));
                    item.setCity((String) m.getKey().get(1));
                    item.setSumField1(m.getValue().stream().mapToDouble(m2-> m2.getSumField1()).sum());
                    item.setSumField2(m.getValue().stream().mapToDouble(m2-> m2.getSumField2()).sum());
                    item.setSumField3(m.getValue().stream().mapToDouble(m2-> m2.getSumField3()).sum());                 
                    return item;
                 }).forEach(f-> {

                //print the MyResultClass fields
        });

1 Ответ

0 голосов
/ 18 мая 2018

Проблема с вашим решением состоит в том, что вы собираете все данные в список, просто для дальнейшего сокращения.Таким образом, он будет накапливать все данные в памяти.Вы можете объединить оба сокращения в одно, используя toMap следующим образом:

myqueries
 .parallelstream()
 .flatMap( m-> { 
    //queryresult is a stream which return database rows
    return queryresult;
 })
 .collect(Collectors.toMap(
               g-> Arrays.<Object>asList(g.getCountry(), g.getCity()),
               v -> { 
                      MyResultClass item = new MyResultClass();
                      item.setCountry(v.getCountry());
                      item.setCity(v.getCity());
                      return item;
                    },
                (t, u) -> {
                       t.setSumField1(t.getSumField1() + u.getSumField1());
                       t.setSumField2(t.getSumField2() + u.getSumField3());
                       t.setSumField3(t.getSumField3() + u.getSumField3());
                       return t;
                      }
                 )
                .values().forEach(f-> {

                //print the MyResultClass fields
                });

Также обратите внимание, что, когда вы используете здесь ParallelsStream, это не означает, что все запросы будут выполняться параллельно,Параллельность будет зависеть от количества запросов, количества ядер в вашей машине и среды выполнения.Если вы хотите контролировать поведение одновременного запроса, лучше использовать ExecutorService .

Еще один момент, на который следует обратить внимание: выполнение также будет зависеть от того, как вы создаете поток из результата запроса.,Если вы подождете, пока не получите весь результат, а затем создадите Stream, тогда вы победите цель самого вопроса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...