Наблюдатель - BlockingQueue - PullRequest
       21

Наблюдатель - BlockingQueue

2 голосов
/ 07 сентября 2011

Я использую шаблон наблюдателя и BlockingQueue, чтобы добавить несколько экземпляров. Теперь в другом методе я использую очередь, но кажется, что take () ждет вечно, хотя я делаю это так:

/** {@inheritDoc} */
@Override
public void diffListener(final EDiff paramDiff, final IStructuralItem paramNewNode,
    final IStructuralItem paramOldNode, final DiffDepth paramDepth) {
    final Diff diff =
        new Diff(paramDiff, paramNewNode.getNodeKey(), paramOldNode.getNodeKey(), paramDepth);
    mDiffs.add(diff);
    try {
        mDiffQueue.put(diff);
    } catch (final InterruptedException e) {
        LOGWRAPPER.error(e.getMessage(), e);
    }
    mEntries++;

    if (mEntries == AFTER_COUNT_DIFFS) {
        try {
            mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs));
        } catch (final Exception e) {
            LOGWRAPPER.error(e.getMessage(), e);
        }
        mEntries = 0;
        mDiffs = new LinkedList<>();
    }
}

/** {@inheritDoc} */
@Override
public void diffDone() {
    try {
        mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs));
    } catch (final Exception e) {
        LOGWRAPPER.error(e.getMessage(), e);
    }
    mDone = true;
}

тогда как mDiffQueue является LinkedBlockingQueue, и я использую его так:

while (!(mDiffQueue.isEmpty() && mDone) || mDiffQueue.take().getDiff() == EDiff.INSERTED) {}

Но я думаю, что первое выражение проверено, тогда как mDone неверно, тогда, возможно, mDone установлено в true (наблюдатель всегда многопоточный?), Но он уже вызывает mDiffQueue.take ()? : - /

Редактировать: Я действительно не понимаю сейчас. Я недавно изменил это на:

synchronized (mDiffQueue) {
    while (!(mDiffQueue.isEmpty() && mDone)) {
        if (mDiffQueue.take().getDiff() != EDiff.INSERTED) {
            break;
        }
    }
}

Если я немного подожду в отладчике, он будет работать, но он также должен работать в «реальном времени», поскольку mDone инициализируется как false, и, следовательно, условие while должно быть истинным, а тело должно выполняться.

Если mDiffQueue пусто, а mDone равно true, оно должно пропустить тело цикла while (что означает, что очередь больше не заполняется).

Редактировать: Кажется, это:

synchronized (mDiffQueue) {
    while (!(mDiffQueue.isEmpty() && mDone)) {
         if (mDiffQueue.peek() != null) {
             if (mDiffQueue.take().getDiff() != EDiff.INSERTED) {
                 break;
             }
         }
    }
}

Хотя я не понимаю, почему функция peek () обязательна.

Edit:

То, что я делаю, это итерация по дереву, и я хочу пропустить все INSERTED узлы:

for (final AbsAxis axis = new DescendantAxis(paramRtx, true); axis.hasNext(); axis.next()) {
    skipInserts();
    final IStructuralItem node = paramRtx.getStructuralNode();
    if (node.hasFirstChild()) {
        depth++;
        skipInserts();
        ...

По сути, вычисление максимальной глубины или уровня в дереве без учета узлов, которые были удалены в другой ревизии дерева (для сравнения Sunburst-визуализации), но все в порядке, это, возможно, выходит за рамки. Просто чтобы проиллюстрировать, что я делаю что-то с узлами, которые не были вставлены, даже если это просто настройка максимальной глубины.

С уважением,

Johannes

Ответы [ 2 ]

2 голосов
/ 07 сентября 2011

take() - это «блокирующий вызов». Это означает, что он будет блокировать (ждать вечно), пока что-то не окажется в очереди, а затем вернет то, что было добавлено. Конечно, если что-то находится в очереди, оно немедленно вернется.

Вы можете использовать peek() для возврата того, что будет возвращено take() - то есть peek() возвращает следующий элемент без удаления его из очереди, или возвращает null, если в очереди ничего нет. Попробуйте использовать peek() вместо этого в своем тесте (но также проверьте на ноль).

1 голос
/ 07 сентября 2011

Первый совет: не synchronized (mDiffQueue). Вы бы зашли в тупик, если бы в LinkedBlockingQueue был какой-то synchronized метод; это не тот случай, но вы должны избегать этой практики. Во всяком случае, я не понимаю, почему вы синхронизируете в этот момент.

Вы должны периодически «просыпаться» во время ожидания, чтобы проверить, установлен ли mDone:

while (!(mDiffQueue.isEmpty()  && mDone)) {
   // poll returns null if nothing is added in the queue for 0.1 second.
   Diff diff = mDiffQueue.poll(0.1, TimeUnit.SECONDS); 
   if (diff != null)
      process(diff);
}

Это примерно то же самое, что и peek, но peek в основном вместо этого ожидает наносекунду. Использование peek называется "ожиданием занятости" (ваш поток запускает цикл while без остановок), а использование pool называется "ожиданием полу-занятости" (вы позволяете потоку спать с интервалами).

Полагаю, в вашем случае process(diff) будет выходить из цикла, если diff не относится к типу EDiff.INSERTED. Я не уверен, что это то, что вы пытаетесь достичь. Это кажется странным, поскольку вы в основном просто останавливаете поток потребителя, пока не получите единственный элемент нужного типа, а затем ничего не делаете с ним. И вы не можете получать будущие входящие элементы, поскольку вы находитесь вне цикла while.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...