У меня есть рабочие потоки, которые получают сообщения от разных классов провайдеров. Каждый класс провайдера добавляет / принимает сообщения внутренней очереди. Каждый поставщик обслуживает только одну очередь утешения, а потребитель утешения добавляет сообщения поставщику очереди.
Несколько работников могут принимать сообщения провайдера, обрабатывать их и затем отправлять подтверждение для сообщения (метод message.commit () ниже выполняет подтверждение).
Сценарий
- Рабочий1 получает сообщение1 для обработки от провайдера1
- Рабочий2 получает сообщение2 для обработки от провайдера1
- Worker2 завершает работу перед worker1, поэтому отправляет подтверждение для сообщения2
ВОПРОС
- Будет ли message2 все еще сидеть в очереди утешения и ждатьсообщение1 будет подтверждено или сообщение2 будет удалено из очереди, несмотря на то, что сообщение1 еще не подтверждено?
- Что происходит на оборудовании для утешения при получении подтверждения? Сообщение2 полностью удалено, как тогда поддерживается порядок очереди?
Класс провайдера
public abstract class BaseProvider implements IProvider {
private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>();
@Override
public synchronized List<CoreMessage> getNextQueuedItem() {
List<CoreMessage> arrMessages = new ArrayList<CoreMessage>();
if (internalQueue.size() > 0) {
Logger.debug("Queue has entries");
CoreMessage msg = null;
try {
msg = internalQueue.take();
} catch (InterruptedException e) {
Logger.warn("Interruption");
e.printStackTrace();
}
if (msg != null) {
arrMessages.add(msg);
}
}
return arrMessages;
}
protected synchronized void addToQueue(CoreMessage message) {
try {
internalQueue.put(message);
} catch (InterruptedException e) {
Logger.error("Exception adding message to queue " + message);
}
}
}
// Существует набор рабочих потоков, которые читают этиочереди
public class Worker implements Runnable
@Override
public void run() {
Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());
while (!stopRequested) {
boolean processedMessage = false;
for (IProvider provider : providers) {
List<CoreMessage> messages = provider.getNextQueuedItem();
if (messages == null || messages.size() != 0) {
processedMessage = true;
for (CoreMessage message : messages) {
final Message msg = createEndurMessage(provider, message);
processMessage(msg);
message.commit();
}
}
}
if (!(processedMessage || stopRequested)) {
// this is to stop the thread from spinning when there are no messages
try {
Thread.sleep(WAIT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}