Проблема многопоточности Java с использованием ConcurrentLinkedQueue - PullRequest
1 голос
/ 16 апреля 2011

У меня проблема со следующим фрагментом кода.Он предназначен для обработки событий (предоставляемых с помощью вызовов метода processEvent), которые добавляются в очередь событий (ConcurrentLinkedQueue).События добавляются в очередь событий и периодически обрабатываются в методе run.

Все хорошо почти всегда.Но иногда после вызова метода processEvent, когда событие добавляется в очередь, часть выполнения не видит новое событие.

Есть идеи о том, что не так?Помимо очевидной ошибки в использовании строковой константы в качестве блокировки?

import java.util.concurrent.ConcurrentLinkedQueue;

public class MyCommunicator implements Runnable {

private ConcurrentLinkedQueue<MyEvent> eventQueue = null;

private boolean stopped = false;

private String lock = "";
private Thread thread = null;

public MyCommunicator() {

    eventQueue = new ConcurrentLinkedQueue<MyEvent>();
}

public void start() {
    thread = new Thread(this, "MyCommunicatorThread");
    thread.start();
}

public void stop() {
    stopped = true;
    synchronized (lock) {
        lock.notifyAll();
    }
    eventQueue.clear();
}

public void run() {
    while (!stopped) {
        try {

            MyEvent event = null;
            while (!stopped && ((event = eventQueue.peek()) != null)) {
                sendEvent(event);
                eventQueue.poll();
            }

            if (!stopped) {
                synchronized (lock) {
                    lock.wait(10000L);
                }
            }
        }

        catch (Exception e) {

        }
    }
}

/**
 * START EVENT JOB - ADD A NEW EVENT TO BE PROCESSED
 */
public void processEvent(MyEvent event) {
    eventQueue.offer(event);
    synchronized (lock) {
        lock.notifyAll();
    }
}

/**
 * END EVENT JOB
 */
private void sendEvent(MyEvent event) {
    // do send event job
}

}

Ответы [ 4 ]

11 голосов
/ 16 апреля 2011

Почему вы используете блокировки и уведомления?

Используйте взамен LinkedBlockingQueue и избавьте себя от всех хлопот.

Это с тайм-аутом на poll() сделает все, что вы пытаетесь сделать.


Редактировать: В отношении текущего кода;

Вам необходимо определить "не видит, что есть новое событие" . Ваш метод run() просматривает очередь каждые 10 секунд; если в очереди что-то есть, оно «увидит» и вытянет.

  • Если вы имеете в виду «Он не видит это сразу после уведомления, только через 10 секунд», тогда на этот вопрос довольно легко ответить, так как у вас есть состояние гонки, которое может легко вызвать это. Что-то может быть вставлено в очередь, пока этот поток находится в промежутке между завершением проверки / обработки очереди и получением блокировки. Без тайм-аута на wait() вы заблокируете, пока не будет вставлено следующее событие. Если в течение этого времени вызывался метод stop(), вы потеряете все события в очереди. Использование LinkedBlockingQueue вместо всех ненужных блокировок и уведомлений решает эту проблему. Это не «простое» решение, оно правильное для этого случая использования и проблемы.

  • Если это не так, то вы просто ничего не вставляете в очередь, и проблема заключается в коде, который вы не разместили здесь. Вероятно, ничего не зная об этом коде, вы пытаетесь вставить ноль MyEvent в eventQueue.offer(event). Поскольку вы не пытаетесь поймать offer(), вы этого не узнаете. Игнорирование всех исключений и не проверка возвращаемых значений не является ни хорошей идеей, ни практикой.

  • Третья возможность состоит в том, что у вас есть какой-то другой код, который где-то блокирует ту же самую точную внутреннюю строковую литеральную ссылку, что приведет к зависанию этого кода. Вы упоминаете об этом, но я повторю здесь - это ДЕЙСТВИТЕЛЬНО плохая вещь, особенно если учесть, что это пустая строка. Пакет java.util.concurrent предоставляет реальные замки с условиями , если вы настаиваете на их использовании здесь. Обратите внимание, что это по-прежнему не устранит состояние гонки в связи с тем, что вы иногда пропускаете событие на 10 секунд, но, по крайней мере, оно будет чище. Чтобы устранить состояние вашей гонки, вы должны отбросить параллельную очередь для обычной и просто получить блокировку перед доступом к ней (а также получить блокировку для вставок). Это синхронизирует ваши потоки, так как вставителю будет запрещено вставлять, если этот поток не ожидает состояния блокировки. Смешивание подходов с блокировкой и без блокировок для синхронизации потоков в одном и том же фрагменте кода часто приводит к этим проблемам.

5 голосов
/ 17 апреля 2011

У вас есть так называемый пропущенный сигнал .Вы опрашиваете очередь, а затем ждете на мониторе (взяв блокировку).Потоки производителя добавляют события и затем вызывают notifyAll() (взяв блокировку).Нет никакой связи между очередью / опросом событий и условным ожиданием / уведомлением.

Поэтому поток А может опрашивать, пока он пуст, а затем пытаться получить блокировку, а поток В добавляет элемент.и получает блокировку, уведомляя все ожидающие потоки, затем снимая блокировку.Затем поток A получает блокировку и ожидает ее, но сигнал был пропущен.

Поскольку вы используете блокировку исключительно для сигнализации, вы можете рассмотреть другой механизм, такой как многоразовая защелка, например, новый jdk7 * 1009 Дуга Ли.* Phaser , или просто используйте BlockingQueue напрямую.

В качестве альтернативы у нас есть пара ReusableLatch , например BooleanLatch для одного потока чтения или PhasedLatch для многопартийной поддержки.

0 голосов
/ 05 апреля 2012

У меня возникла проблема с ConcurrentLinkedQueue, которую я действительно подозреваю как подлинную ошибку, потому что она не полностью синхронизирована.

Я еще не полностью проверил это, но я посмотрел накод, и я совершенно уверен, что .isEmpty () не синхронизируется, если очередь на самом деле пуста.Хотя один поток вызывает .isEmpty () и возвращается true, очередь может уже содержать элементы.

0 голосов
/ 16 апреля 2011

На первый взгляд нет конкретной идеи, но любое количество вещей может пойти не так без вашего ведома, из-за этого:

    catch (Exception e) {

    }

Обработчик, который перехватывает любые Exception (включая RuntimeException)и его различные подклассы), а затем игнорирует это, как правило, плохая идея.Если это предназначено для перехвата определенного типа исключения (например, InterruptedException, который может быть выдан lock.wait()), то вы должны ограничить его этим типом исключения.Если у вас есть какая-то причина для перехвата какого-либо исключения, вы должны хотя бы записать что-то в журнал при возникновении исключения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...