Правильный способ объединить группу коллекций - PullRequest
1 голос
/ 20 июня 2019

Я сделал некоторый код для объединения в параллельную группу коллекций, которая содержит пары [String, Integer], Пример

Тема 1 [Автомобиль, 1] [Медведь, 1] [Автомобиль, 1]

Тема 2 [Река, 1] [Автомобиль, 1] [Река, 1]

Результатом должны быть наборы каждой уникальной пары ключей (отсортированные в алфавитном порядке)

[медведь, 1]

[Автомобиль, 1] [Автомобиль, 1] [Автомобиль, 1]

[Река, 1] [Река, 1] [Река, 1]

Мое решение сделать это, как показано ниже, но иногда я не получаю ожидаемый результат, или исключение ConcurrentModificationException выбрасывается из списка, содержащего коллекции результатов

List<Collection<Pair<String, Integer>>> combiningResult = new ArrayList<>();

private void startMappingPhase() throws Exception {
    SimpleDateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
    Invoker invoker = new Invoker(mappingClsPath, "Mapping", "mapper");
    List<Callable<Integer>> tasks = new ArrayList<>();
    for (String line : fileLines) {
        tasks.add(() -> {
            try {
                combine((Collection<Pair<String, Integer>>) invoker.invoke(line));
            } catch (Exception e) {
                e.printStackTrace();
                executor.shutdownNow();
                errorOccurred = true;
                return 0;
            }
            return 1;
        });

        if (errorOccurred)
            Utils.showFatalError("Some error occurred, See log for more detalis");
    }
    long start = System.nanoTime();
    System.out.println(tasks.size() + " Tasks");
    System.out.println("Started at " + formatter.format(new Date()) + "\n");


    executor.invokeAll(tasks);

    long elapsedTime = System.nanoTime() - start;

    partitioningResult.forEach(c -> {
        System.out.println(c.size() + "\n" + c);
    });


    System.out.print("\nFinished in " + (elapsedTime / 1_000_000_000.0) + " milliseconds\n");
}

private void partition(Collection<Pair<String, Integer>> pairs) {

    Set<Pair<String, Integer>> uniquePairs = new LinkedHashSet<>(pairs);

    for (Pair<String, Integer> uniquePair : uniquePairs) {

        int pFrequencyCount = Collections.frequency(pairs, uniquePair);

        Optional<Collection<Pair<String, Integer>>> collResult = combiningResult.stream().filter(c -> c.contains(uniquePair)).findAny();
        if (collResult.isPresent()) {
            collResult.ifPresent(c -> {
                for (int i = 0; i < pFrequencyCount; i++)
                    c.add(uniquePair);
            });
        } else {
            Collection<Pair<String, Integer>> newColl = new ArrayList<>();
            for (int i = 0; i < pFrequencyCount; i++)
                newColl.add(uniquePair);
            combiningResult.add(newColl);
        }

    }
}

Я пытался CopyOnWriteList настаивал на ArrayList, но иногда он получает неполный результат, как

[Автомобиль, 1] [Автомобиль, 1] настаивал на трех записях, мой вопрос

Есть ли способ добиться того, что я пытаюсь сделать, без получения ConcurrentModificationException и неполного результата?

Пример изображения

1 Ответ

0 голосов
/ 20 июня 2019

Если вы пытаетесь изменить одну коллекцию из нескольких потоков, вам нужно добавить синхронизированный блок или использовать один из классов JDK, поддерживающих параллелизм. Обычно они работают лучше, чем синхронизированный блок.

https://docs.oracle.com/javase/tutorial/essential/concurrency/collections.html

...