Я пытаюсь понять, является ли нижеследующее потокобезопасным, оно было написано другим разработчиком, чей код я унаследовал и больше не с нами.
У меня есть класс BaseProvider, который на самом деле является кэшем сообщений, представленный LinkedBlockingQueue. Этот класс хранит входящие сообщения в очереди.
У меня есть набор рабочих потоков, которые читают эту очередь. Таким образом, LinkedBlockingQueue является поточно-ориентированным.
Вопросы 1. Когда рабочий поток вызывает функциюr.r.NetxtQueuedItem (), провайдер просматривает элемент за элементом и добавляет его в список и возвращает список сообщений. Пока он это делает, что произойдет, если в класс провайдера добавлено сообщение с вызовом addToQueue? Внутренний ли takeLock в LinkedBlockingQueue препятствует добавлению нового сообщения в очередь, пока все сообщения не будут удалены из очереди?
Как вы могли заметить, каждый рабочий поток имеет доступ ко всем поставщикам, поэтому, пока один рабочий поток проходит через всех поставщиков и вызывает getNextQueuedItem (), что происходит, когда другой рабочий поток также вызывает через всех поставщикови вызывает getNextQueuedItem ()? Будут ли оба рабочих потока перешагивать друг друга?
открытый абстрактный класс BaseProvider реализует IProvider {
private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>();
@Override
public synchronized List<CoreMessage> getNextQueuedItem() {
List<CoreMessage> arrMessages = new ArrayList<CoreMessage>();
if (internalQueue.size() > 0) {
Logger.debug("Queue has entries");
CoreMessage msg = null;
try {
msg = internalQueue.take();
} catch (InterruptedException e) {
Logger.warn("Interruption");
e.printStackTrace();
}
if (msg != null) {
arrMessages.add(msg);
}
}
return arrMessages;
}
protected synchronized void addToQueue(CoreMessage message) {
try {
internalQueue.put(message);
} catch (InterruptedException e) {
Logger.error("Exception adding message to queue " + message);
}
}
}
// Естьнабор рабочих потоков, которые читают эти очереди
public class Worker implements Runnable
@Override
public void run() {
Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());
while (!stopRequested) {
boolean processedMessage = false;
for (IProvider provider : providers) {
List<CoreMessage> messages = provider.getNextQueuedItem();
if (messages == null || messages.size() != 0) {
processedMessage = true;
for (CoreMessage message : messages) {
final Message msg = createEndurMessage(provider, message);
processMessage(msg);
message.commit();
}
}
}
if (!(processedMessage || stopRequested)) {
// this is to stop the thread from spinning when there are no messages
try {
Thread.sleep(WAIT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}