Спасибо всем ответчикам, которые упомянули Collectors.groupingBy()
.Это было ключом к настройке потока, где я мог бы использовать reduce()
.Я ошибочно полагал, что смогу использовать reduce
сам по себе для решения проблемы, без groupingBy
.
Спасибо также за предложение создать свободный API.Я добавил IncomingFlatItem.getEmailAddress()
и IncomingFlatItem.getReport()
, чтобы свободно захватывать доменные объекты из IncomingFlatItem
, а также метод для преобразования всего плоского элемента в соответствующий объект домена с его электронной почтой и уже вложенным отчетом:
public Client getClient() {
Client client = new Client();
client.setClientCode(clientCode);
client.setClientName(clientName);
client.setEmailAddresses(new ArrayList());
client.getEmailAddresses().add(this.getEmailAddress());
client.setReports(new ArrayList<>());
client.getReports().add(this.getReport());
return client;
}
Я также создал методы .equals()
и .hashCode()
на основе бизнес-идентификаторов для Client
, EmailAddress
и Report
в соответствии с рекомендациями @SamuelPhilip
Наконец, для объектов домена я создал .addReport(Report r)
и .addEmail(EmailAddress e)
в моем Client
классе, который добавил бы дочерний объект к Client
, если его еще нет.Я исключил тип коллекции Set
для List
, потому что стандарт модели домена - List
, а Sets
означало бы много преобразований в Lists
.
Итак, код потока и лямбды выглядят лаконично.
Существует 3 шага:
- map
IncomingFlatItems
to Clients
- группировка
Clients
в карту по клиенту (в значительной степени полагаясь на * 1041)*) - уменьшить каждую группу до одной
Client
Так что это функциональный алгоритм:
List<Client> unflatten(List<IncomingFlatItem> flatItems) {
return flatItems.parallelStream()
.map(IncomingFlatItem::getClient)
.collect(Collectors.groupingByConcurrent(client -> client))
.entrySet().parallelStream()
.map(kvp -> kvp.getValue()
.stream()
.reduce(new Client(),
(client1, client2) -> {
client1.getReports()
.forEach(client2::addReport);
client1.getEmailAddresses()
.forEach(client2::addEmail);
return client2;
}))
.collect(Collectors.toList());
}
У меня ушло много времени из-за выходана касательной, прежде чем я действительно понял reduce
- я нашел решение, которое прошло мои тесты при использовании .stream()
, но полностью провалилось с .parallelStream()
, следовательно, его использование здесь.Я должен был использовать CopyOnWriteArrayList
, иначе он случайно упадет с ConcurrentModificationExceptions