java параллелизм - поддерживать порядок между определенными задачами - PullRequest
3 голосов
/ 05 мая 2020

Мое приложение получает сообщения от нескольких пользователей. Сообщения для каждого пользователя должны выполняться по порядку, в противном случае сообщения могут выполняться параллельно. Как реализовать такой лог c?

Например, сообщения приходят в таком порядке: u1:m1 u1:m2 u2:m1 u1:m3 u2:m2. Выполнение должно быть параллельным, например:

  • thread1: u1:m1 u1:m2 u1:m3
  • thread2: u2:m1 u2:m2

Количество пользователей может быть огромным, поэтому я не могу просто создать однопотоковый исполнитель для каждого пользователя.

private ExcutorService executorService = newFixedThreadPool(10);

public void onMessage(String user, String message) {
  // TODO schedule tasks per user in order
  executorService.schedule(() -> processMessage(message));
}

Ответы [ 2 ]

3 голосов
/ 06 мая 2020

Вы можете сделать это так:

        //CREATE EXECUTORS
        int numberOfThreads = 10;
        ExecutorService[] executors = new ExecutorService[numberOfThreads];
        for (int i = 0; i < numberOfThreads; i++) {
            executors[i] = Executors.newSingleThreadExecutor();
        }

Затем в вашем методе используйте специфицированный c исполнитель для пользователя с помощью hashCode и по модулю:

public void onMessage(String user, String message) {
    //same user will always get the same executor, hashCode will evenly distribute the load among the executors
    int executorToUse = Math.abs(user.hashCode()) % numberOfThreads; 
    ExecutorService executorService = executors[executorToUse];
    executorService.execute(() -> processMessage(message));
}
0 голосов
/ 06 мая 2020

Вы можете разделить производителя и обработчик сообщений по типу очереди группировки:

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class GroupingBlockingQueue<G, E> {

    private static final int UNBOUNDED = -1;

    private final Lock lock = new ReentrantLock();

    private final Condition notFull  = lock.newCondition();

    private final Condition notEmpty = lock.newCondition();

    private final Map<G, List<E>> map = new LinkedHashMap<>();

    private final int bound;

    public GroupingBlockingQueue() {
        this(UNBOUNDED);
    }

    public GroupingBlockingQueue(int bound) {
        this.bound = bound;
    }

    public void put(G group, E element) throws InterruptedException {
        lock.lock();
        try {
            if (bound > 0) {
                while (bound == map.keySet().size()) {
                    notFull.await();
                }
            }
            map.computeIfAbsent(group, k -> new ArrayList<>()).add(element);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Item<G, E> take() throws InterruptedException {
        lock.lock();
        try {
            Set<G> keys = map.keySet();
            while (keys.isEmpty()) {
                notEmpty.await();
            }
            G group = keys.iterator().next();
            List<E> elements = map.remove(group);
            notFull.signal();
            return new Item<>(group, elements);
        } finally {
            lock.unlock();
        }
    }

    public static class Item<G, E> {
        private final G group;
        private final List<E> elements;

        public Item(G group, List<E> elements) {
            this.group = group;
            this.elements = elements;
        }

        public G getGroup() {
            return group;
        }

        public List<E> getElements() {
            return elements;
        }
    }
}

Тогда обработчик сообщений может выглядеть так:

package ru.dkovalev;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.Executors.newFixedThreadPool;

public class UserMessageProcessor implements AutoCloseable {

    private GroupingBlockingQueue<String, String> queue = new GroupingBlockingQueue<>();
    private ExecutorService executorService;

    public UserMessageProcessor(int threadCount) {
        executorService = newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount; i++) {
            executorService.submit((Runnable) this::processMessages);
        }
    }

    private void processMessages() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                GroupingBlockingQueue.Item<String, String> item = queue.take();
                processMessages(item.getGroup(), item.getElements());
            }
        } catch (InterruptedException ignore) {}
    }

    private void processMessages(String user, List<String> messages) throws InterruptedException {
        for (String message : messages) {
            System.out.println(String.format("[%s] %s:%s", Thread.currentThread().getName(), user, message));
            Thread.sleep(TimeUnit.SECONDS.toMillis(1));
        }
    }

    public void onMessage(String user, String message) throws InterruptedException {
        queue.put(user, message);
    }

    @Override
    public void close() {
        try {
            executorService.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException ignore) {
        }
        executorService.shutdown();
    }

    public static void main(String[] args) throws InterruptedException {
        try (UserMessageProcessor processor = new UserMessageProcessor(10)) {
            processor.onMessage("u1", "m1");
            processor.onMessage("u2", "m1");
            processor.onMessage("u1", "m2");
            processor.onMessage("u2", "m2");
            processor.onMessage("u1", "m3");
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...