Аккумулятор не работает должным образом в параллельном потоке - PullRequest
0 голосов
/ 20 января 2019

Я создал коллектор, который может преобразовать поток в карту, в которой ключами являются элементы, которые могут быть куплены определенными клиентами, а имена клиентов - значениями. Моя реализация работает, вероятно, в последовательном потоке, но когда я пытаюсь использоватьparallel это вообще не работает, результирующие наборы всегда содержат одно имя клиента.

List<Customer> customerList = this.mall.getCustomerList();

Supplier<Object> supplier = ConcurrentHashMap<String,Set<String>>::new;

BiConsumer<Object, Customer> accumulator = ((o, customer) -> customer.getWantToBuy().stream().map(Item::getName).forEach(
            item -> ((ConcurrentHashMap<String,Set<String>>)o)
                    .merge(item,new HashSet<String>(Collections.singleton(customer.getName())),
                            (s,s2) -> {
                                HashSet<String> res = new HashSet<>(s);
                                res.addAll(s2);
                                return res;
                            })
    ));

BinaryOperator<Object> combiner = (o,o2) -> {
        ConcurrentHashMap<String,Set<String>> res = new ConcurrentHashMap<>((ConcurrentHashMap<String,Set<String>>)o);
        res.putAll((ConcurrentHashMap<String,Set<String>>)o2);
        return res;
    };

Function<Object, Map<String, Set<String>>> finisher = (o) -> new HashMap<>((ConcurrentHashMap<String,Set<String>>)o);

Collector<Customer, ?, Map<String, Set<String>>> toItemAsKey =
        new CollectorImpl<>(supplier, accumulator, combiner, finisher, EnumSet.of(
            Collector.Characteristics.CONCURRENT,
            Collector.Characteristics.IDENTITY_FINISH));

Map<String, Set<String>> itemMap = customerList.stream().parallel().collect(toItemAsKey);

Конечно, есть проблема в моей accumulator реализации или другой Function, но я не могу это выяснить!кто-нибудь может подсказать, что мне делать?

1 Ответ

0 голосов
/ 20 января 2019

Ваш объединитель неправильно реализован.
Вы перезаписываете все записи, имеющие одинаковый ключ. То, что вы хотите, это добавление значений к существующим ключам.

BinaryOperator<ConcurrentHashMap<String,Set<String>>> combiner = (o,o2) -> {
        ConcurrentHashMap<String,Set<String>> res = new ConcurrentHashMap<>(o);
        o2.forEach((key, set) -> set.forEach(string -> res.computeIfAbsent(key, k -> new HashSet<>())
                                                          .add(string)));
        return res;
    };
...