Координация нескольких одновременных очередей - PullRequest
4 голосов
/ 06 января 2011

В настоящее время у меня есть параллельная реализация очереди, которая использует BlockingQueue в качестве хранилища данных. Теперь мне нужно представить объект второго типа с более высоким приоритетом, что приведет меня к очереди с голоданием / приоритетом для исходной очереди. Итак, мы работаем с объектами типа A и типа B, которые создаются из нескольких потоков. Любые объекты типа B должны быть обработаны до объектов типа A, но ДОЛЖНО поддерживаться, кроме этого порядка FIFO. Поэтому, если {1A, 1B, 2A, 3A, 2B} вставлены, порядок должен быть {1B, 2B, 1A, 2A, 3A}

Я пытался одним PriorityBlockingQueue выдвинуть тип B вперед, но я не смог выполнить требование FIFO (нет естественного порядка между элементами одного типа).

Моя следующая мысль - использовать две параллельные очереди. Я ищу общие ошибки или соображения при координации доступа между двумя очередями. В идеале я бы хотел сделать что-то вроде этого:

   public void add(A a)
   {
       aQueue.add(a);
   }
   public void add(B b)
   {
       bQueue.add(b);
   }

   private void consume() 
   {
       if(!bQueue.isEmpty())
          process(bQueue.poll());
       else if(!aQueue.isEmpty())
          process(aQueue.poll());
   }

Нужна ли мне какая-либо синхронизация или блокировки, если обе очереди ConcurrentLinkedQueue (или вставить более подходящую структуру здесь)? Примечание. У меня много производителей, но только один потребитель (однопоточный ThreadPoolExecutor).

РЕДАКТИРОВАТЬ: Если после проверки isEmpty () входит B, то нормально обрабатывать A и обрабатывать его при следующем вызове потребление ().

Ответы [ 3 ]

3 голосов
/ 06 января 2011

Я не уверен, правильно ли я понял вашу ситуацию, но я думаю, что можно решить эту проблему, используя одну очередь.

Вы сказали, что ваши объекты (в очереди) должны быть сопоставимы по естественному порядку и типу. Если нет естественного порядка, просто используйте генератор последовательности (т.е. AtomicLong), который даст вашим объектам уникальный, постоянно увеличивающийся идентификатор очереди. Получение данных из AtomicLong не займет много времени, если вы не в мире наносекунд.

Так что ваш Comparator.compare должен быть таким:

1) проверить тип объекта. Если он отличается (A VS B), верните 1 / -1. В противном случае см. Ниже
2) проверьте идентификатор. Это гарантированно будет другим.

Если вы не можете изменить объекты (A и B), вы все равно можете заключить их в другой объект, который будет содержать этот идентификатор.

3 голосов
/ 06 января 2011

Исходя из ваших требований, ваш метод должен работать нормально.

Я бы внес следующие изменения в ваш код.Это блокирует ваш getNextItem (), пока одна из очередей не вернет вам объект.

private Object block = new Object();

public void add(A a)
{
    synchronized( block )
    {
        aQueue.add( a );
        block.notifyAll();
    }
}

public void add(B b)
{
    synchronized( block )
    {
        bQueue.add( b );
        block.notifyAll();
    }
}

private Object consume() 
{
    Object value = null
    synchroinzed( block )
    {
        while ( return == null )
        {
            value = bQueue.poll();
            if ( value == null ) value = aQueue.poll();
            if ( value == null ) block.wait();
        }
     }

   return value;
}
1 голос
/ 06 января 2011

Вам необходимо синхронизировать метод getQueueItem ().

С A и B, реализующим интерфейс QueueItem.

   public void add(A a)
   {
       aQueue.add(a);
   }
   public void add(B b)
   {
       bQueue.add(b);
   }

   private void consume() 
   {
       process(getNextItem());
   }

    private QueueItem getNextItem()
    {
       synchronized(bQueue) {
          if(!bQueue.isEmpty()) return bQueue.poll();
          return aQueue.poll();
       }
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...