Как обеспечить обработку заказа в параллельных потоках? - PullRequest
0 голосов
/ 16 апреля 2020

Я зацикливался, чтобы обеспечить порядок в параллельных потоках на основе некоторого значения поля. Может быть, я приведу некоторый абстрактный пример:

Допустим, у нас есть класс User

@Lombok.Data()
class User {
   private String firstName;
   private String lastName;
   private int someValue;
   private int priority;
}

, и у нас есть список этих пользователей:

List<User> users = someInitUsersFunction();

Я хочу форсировать поток parellel для обработки каждого пользователя по приоритету, скажем, у нас есть 100 пользователей с приоритетом 0, 100 пользователей с приоритетом 1 и 100 пользователей с приоритетом 2.

Я хочу запускать пользователей процесса с приоритетом 2 только при приоритете 1 сделано, и когда приоритет 0 сделан.

Я думаю

mvn install -T 4

может быть подход, который я ищу ( сначала построить независимые модули). Возможно ли это сделать в java потоках? Также возможно использование альтернатив.

Мой подход состоит в том, чтобы разделить на указанный c список по приоритету, а затем обработать список по списку

Ответы [ 3 ]

1 голос
/ 16 апреля 2020

Чтобы обработать пользователей в блоках по приоритету, но параллельно обрабатывать пользователей с одинаковым приоритетом, сначала сгруппируйте пользователей по приоритету, затем обработайте каждую группу отдельно.

users.stream().collect(Collectors.groupingBy(User::getPriority, TreeMap::new, Collectors.toList()))
        .values().stream()
        .forEachOrdered(list -> // sequential, in priority order
            list.parallelStream().forEach(user -> { // parallel, unordered
                // process user here
            }));

Без вложенных потоков и прокомментировал для ясности:

// Group users by priority
TreeMap<Integer, List<User>> usersByPriority = users.stream()
        .collect(Collectors.groupingBy(User::getPriority, TreeMap::new, Collectors.toList()));

// Process groups in priority order
for (List<User> list : usersByPriority.values()) {

    // Process users of current priority in parallel
    list.parallelStream().forEach(user -> {
        // process user here
    });

    // We won't loop to next priority until all users with current priority has been processed
}
0 голосов
/ 16 апреля 2020

Если вы хотите обработать первые 100 единиц с приоритетом 1, то последние 100 и т. Д., Они не являются ни параллельными, ни параллельными, а фактически последовательными. Эти частичные подсписки могут обрабатываться параллельно. A PriotityQueue или SortedMap являются способами go.

SortedMap:

Используйте реализацию TreeSet внутри Collectors.groupingBy метод:

Map<Integer, List<User>> map = users.stream()
    .collect(Collectors.groupingBy(
         User::getPriority,
         TreeMap::new,
         Collectors.toList()));

Карта отсортирована по приоритету (ключу).

PriorityQueue:

  • Группировать по * От 1024 * до Map<Integer, List<User>> для группировки подсписков по приоритету
  • Добавьте в PriorityQueue сравнение по приоритету
  • Параллельно обрабатывайте списки, опрашиваемые последовательно

Начните с группировки:

Map<Integer, List<User>> map = users.stream().collect(Collectors.groupingBy(User::getPriority);

На данный момент карта будет выглядеть следующим образом:

[User(firstName=C, lastName=C, someValue=0, priority=1)]
[User(firstName=A, lastName=A, someValue=0, priority=2)]
[User(firstName=B, lastName=B, someValue=0, priority=3), User(firstName=D, lastName=D, someValue=0, priority=3)]

Создать PriorityQueue из карты:

 Queue<List<User>> queue = map.entrySet()
        .stream()
        .collect(
            () -> new PriorityQueue<>(Comparator.comparingInt(list -> list.get(0).getPriority())),
            (pq, entry) -> pq.add(entry.getValue()),
            AbstractQueue::addAll);

Итерация по очереди учитывает приоритет, и подмножество может обрабатываться параллельно, поскольку они имеют одинаковый приоритет.

for (List<User> users : queue) {
    users.stream().parallel()...
}
0 голосов
/ 16 апреля 2020

Sequential vs parallel - это не то же самое, что ordering.

Если у вас есть заказанный поток и вы выполняете некоторые операции, которые гарантируют поддержание порядка, не важно, обрабатывается ли поток параллельно или последовательный; реализация должна сохранять порядок.

В вашем случае (только если ваш список не соответствует порядку, который вы хотите ..) вы можете использовать спецификацию c Comparator или позволить User Implements Comparable основано на priority поле. Затем сортируйте свой список перед началом выполнения других потоковых операций. Параллельное или последовательное не даст другого результата

Или с использованием указанных c типов коллекций, таких как SortedSet или PriorityQueue

Некоторое внимание относительно PriorityQueue Итератор, предоставленный в итераторе метода () не гарантируется прохождение элементов PriorityBlockingQueue в каком-либо конкретном порядке.

Таким образом, вам необходимо отсортировать элемент во время потоковой передачи, чтобы сохранить порядок -> stream().sorted() или просто использовать метод poll для него .

...