LinkedBlockingQueue и рабочие потоки - безопасен ли этот поток кода? - PullRequest
0 голосов
/ 18 октября 2019

Я пытаюсь понять, является ли нижеследующее потокобезопасным, оно было написано другим разработчиком, чей код я унаследовал и больше не с нами.

У меня есть класс BaseProvider, который на самом деле является кэшем сообщений, представленный LinkedBlockingQueue. Этот класс хранит входящие сообщения в очереди.

У меня есть набор рабочих потоков, которые читают эту очередь. Таким образом, LinkedBlockingQueue является поточно-ориентированным.

Вопросы 1. Когда рабочий поток вызывает функциюr.r.NetxtQueuedItem (), провайдер просматривает элемент за элементом и добавляет его в список и возвращает список сообщений. Пока он это делает, что произойдет, если в класс провайдера добавлено сообщение с вызовом addToQueue? Внутренний ли takeLock в LinkedBlockingQueue препятствует добавлению нового сообщения в очередь, пока все сообщения не будут удалены из очереди?

Как вы могли заметить, каждый рабочий поток имеет доступ ко всем поставщикам, поэтому, пока один рабочий поток проходит через всех поставщиков и вызывает getNextQueuedItem (), что происходит, когда другой рабочий поток также вызывает через всех поставщикови вызывает getNextQueuedItem ()? Будут ли оба рабочих потока перешагивать друг друга?

открытый абстрактный класс BaseProvider реализует IProvider {

 private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>();

@Override
public synchronized List<CoreMessage> getNextQueuedItem() {
    List<CoreMessage> arrMessages = new ArrayList<CoreMessage>();                    
    if (internalQueue.size() > 0) {
        Logger.debug("Queue has entries");    
        CoreMessage msg = null;
        try {
            msg = internalQueue.take();
        } catch (InterruptedException e) {
            Logger.warn("Interruption");
            e.printStackTrace();
        }
        if (msg != null) {
            arrMessages.add(msg);
        }
    }
    return arrMessages;
}

protected synchronized void addToQueue(CoreMessage message) {
    try {
        internalQueue.put(message);
    } catch (InterruptedException e) {
        Logger.error("Exception adding message to queue " + message);
    }
}

}

// Естьнабор рабочих потоков, которые читают эти очереди

public class Worker implements Runnable 
 @Override
 public void run() {
    Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());

    while (!stopRequested) {
        boolean processedMessage = false;
        for (IProvider provider : providers) {
            List<CoreMessage> messages = provider.getNextQueuedItem();
            if (messages == null || messages.size() != 0) {
                processedMessage = true;
                for (CoreMessage message : messages) {
                    final Message msg = createEndurMessage(provider, message);
                    processMessage(msg);
                    message.commit();
                }
            }
        }
        if (!(processedMessage || stopRequested)) {
            // this is to stop the thread from spinning when there are no messages
            try {
                Thread.sleep(WAIT_INTERVAL);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

} ​​

1 Ответ

0 голосов
/ 18 октября 2019

что произойдет, если в класс провайдера добавлено сообщение с вызовом addToQueue?

getNextQueuedItem() и addToQueue(...) оба являются synchronized методами. Если это единственные два метода, которые обращаются к private ... internalQueue, то нет никакого способа, которым несколько потоков могли бы когда-либо одновременно обращаться к internalQueue.

, пока один рабочий поток проходит черезвсе провайдеры и вызовы getNextQueuedItem (), что происходит, когда другой рабочий поток также вызывает всех провайдеров и вызывает getNextQueuedItem ()?

Вы спрашиваете, есть ли у нескольких работников доступ к одному и тому же провайдеру? Этого не может быть, потому что getNextQueuedItem() - это synchronized метод.

- ИЛИ -

Вы спрашиваете, как разные работники обращаются к разным провайдерам? Это не должно иметь значения (по крайней мере, не в том, что касается класса BaseProvider), потому что, похоже, нет никакого способа, которым различные объекты могли бы быть связаны друг с другом.

...