Вы можете разделить производителя и обработчик сообщений по типу очереди группировки:
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class GroupingBlockingQueue<G, E> {
private static final int UNBOUNDED = -1;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Map<G, List<E>> map = new LinkedHashMap<>();
private final int bound;
public GroupingBlockingQueue() {
this(UNBOUNDED);
}
public GroupingBlockingQueue(int bound) {
this.bound = bound;
}
public void put(G group, E element) throws InterruptedException {
lock.lock();
try {
if (bound > 0) {
while (bound == map.keySet().size()) {
notFull.await();
}
}
map.computeIfAbsent(group, k -> new ArrayList<>()).add(element);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Item<G, E> take() throws InterruptedException {
lock.lock();
try {
Set<G> keys = map.keySet();
while (keys.isEmpty()) {
notEmpty.await();
}
G group = keys.iterator().next();
List<E> elements = map.remove(group);
notFull.signal();
return new Item<>(group, elements);
} finally {
lock.unlock();
}
}
public static class Item<G, E> {
private final G group;
private final List<E> elements;
public Item(G group, List<E> elements) {
this.group = group;
this.elements = elements;
}
public G getGroup() {
return group;
}
public List<E> getElements() {
return elements;
}
}
}
Тогда обработчик сообщений может выглядеть так:
package ru.dkovalev;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newFixedThreadPool;
public class UserMessageProcessor implements AutoCloseable {
private GroupingBlockingQueue<String, String> queue = new GroupingBlockingQueue<>();
private ExecutorService executorService;
public UserMessageProcessor(int threadCount) {
executorService = newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit((Runnable) this::processMessages);
}
}
private void processMessages() {
try {
while (!Thread.currentThread().isInterrupted()) {
GroupingBlockingQueue.Item<String, String> item = queue.take();
processMessages(item.getGroup(), item.getElements());
}
} catch (InterruptedException ignore) {}
}
private void processMessages(String user, List<String> messages) throws InterruptedException {
for (String message : messages) {
System.out.println(String.format("[%s] %s:%s", Thread.currentThread().getName(), user, message));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
}
}
public void onMessage(String user, String message) throws InterruptedException {
queue.put(user, message);
}
@Override
public void close() {
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
}
executorService.shutdown();
}
public static void main(String[] args) throws InterruptedException {
try (UserMessageProcessor processor = new UserMessageProcessor(10)) {
processor.onMessage("u1", "m1");
processor.onMessage("u2", "m1");
processor.onMessage("u1", "m2");
processor.onMessage("u2", "m2");
processor.onMessage("u1", "m3");
}
}
}