Советы по эффективной блокировке запросов - PullRequest
4 голосов
/ 10 января 2010

Я хотел бы хранить объекты кортежей в параллельной коллекции Java, а затем иметь эффективный блокирующий метод запроса, который возвращает первый элемент, соответствующий шаблону. Если такой элемент недоступен, он будет блокироваться до тех пор, пока такой элемент не появится.

Например, если у меня есть класс:

public class Pair {
  public final String first;
  public final String Second;
  public Pair( String first, String second ) {
    this.first = first;
    this.second = second;
  }
}

А коллекция вроде:

public class FunkyCollection {
  public void add( Pair p ) { /* ... */ }
  public Pair get( Pair p ) { /* ... */ }
}

Я хотел бы запросить его как:

myFunkyCollection.get( new Pair( null, "foo" ) );

, который возвращает первую доступную пару с полем second, равным «foo» или блоками, пока такой элемент не будет добавлен. Другой пример запроса:

myFunkyCollection.get( new Pair( null, null ) );

должен возвращать первую доступную пару независимо от ее значений.

Решение уже существует? Если это не так, что вы предлагаете для реализации метода get( Pair p )?

Уточнение: Метод get( Pair p) также должен удалить элемент. Выбор имени был не очень умным. Лучшее имя будет take( ... ).

Ответы [ 4 ]

3 голосов
/ 10 января 2010

Вот некоторый исходный код. Это в основном то же самое, что сказал cb160, но наличие исходного кода может помочь прояснить любые вопросы, которые у вас могут возникнуть. В частности, методы FunkyCollection должны быть синхронизированы.

Как указывал Меритон, метод get выполняет O (n) -сканирование для каждого заблокированного получения каждый раз, когда добавляется новый объект. Он также выполняет операцию O (n) для удаления объектов. Это можно улучшить, используя структуру данных, аналогичную связанному списку, в котором можно сохранить итератор до последнего проверенного элемента. Я не предоставил исходный код для этой оптимизации, но он не должен быть слишком сложным для реализации, если вам нужна дополнительная производительность.

import java.util.*;

public class BlockingQueries
{
    public class Pair
    {
        public final String first;
        public final String second;
        public Pair(String first, String second)
        {
            this.first = first;
            this.second = second;
        }
    }

    public class FunkyCollection
    {
        final ArrayList<Pair> pairs = new ArrayList<Pair>();

        public synchronized void add( Pair p )
        {
            pairs.add(p);
            notifyAll();
        }

        public synchronized Pair get( Pair p ) throws InterruptedException
        {
            while (true)
            {
                for (Iterator<Pair> i = pairs.iterator(); i.hasNext(); )
                {
                    Pair pair = i.next();
                    boolean firstOk = p.first == null || p.first.equals(pair.first);
                    boolean secondOk = p.second == null || p.second.equals(pair.second);
                    if (firstOk && secondOk)
                    {
                        i.remove();
                        return pair;                
                    }
                }
                wait();
            }
        }   
    }

    class Producer implements Runnable
    {
        private FunkyCollection funkyCollection;

        public Producer(FunkyCollection funkyCollection)
        {
            this.funkyCollection = funkyCollection;
        }

        public void run()
        {
            try
            {
                for (int i = 0; i < 10; ++i)
                {
                    System.out.println("Adding item " + i);
                    funkyCollection.add(new Pair("foo" + i, "bar" + i));
                    Thread.sleep(1000);
                }
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void go() throws InterruptedException
    {
        FunkyCollection funkyCollection = new FunkyCollection();
        new Thread(new Producer(funkyCollection)).start();
        System.out.println("Fetching bar5.");
        funkyCollection.get(new Pair(null, "bar5"));
        System.out.println("Fetching foo2.");
        funkyCollection.get(new Pair("foo2", null));
        System.out.println("Fetching foo8, bar8");
        funkyCollection.get(new Pair("foo8", "bar8"));
        System.out.println("Finished.");
    }

    public static void main(String[] args) throws InterruptedException
    {
        new BlockingQueries().go();
    }
}

Выход:

Fetching bar5.
Adding item 0
Adding item 1
Adding item 2
Adding item 3
Adding item 4
Adding item 5
Fetching foo2.
Fetching foo8, bar8
Adding item 6
Adding item 7
Adding item 8
Finished.
Adding item 9

Обратите внимание, что я поместил все в один исходный файл, чтобы упростить его запуск.

3 голосов
/ 10 января 2010

Я не знаю ни одного контейнера, который бы обеспечивал такое поведение. Одна из проблем, с которой вы сталкиваетесь, - это случай, когда ни одна из существующих записей не соответствует запросу. В этом случае вам придется ждать поступления новых записей, и эти новые записи должны прибыть в конец последовательности. Учитывая, что вы блокируете, вам не нужно проверять все записи, которые предшествуют последнему добавлению, потому что вы уже проверили их и определили, что они не совпадают. Следовательно, вам нужен какой-то способ записать вашу текущую позицию и иметь возможность искать вперед оттуда при поступлении новой записи.

Это ожидание - работа для Condition. Как предлагается в ответе cb160 , вам следует выделить экземпляр Condition внутри вашей коллекции и заблокировать его с помощью Condition#await(). Вы также должны предоставить сопутствующую перегрузку вашему get() методу, чтобы разрешить время ожидания:

public Pair get(Pair p) throws InterruptedException;
public Pair get(Pair p, long time, TimeUnit unit) throws InterruptedException;

При каждом вызове add() вызывайте Condition#signalAll(), чтобы разблокировать потоки, ожидающие неудовлетворенных запросов get(), что позволяет им сканировать последние добавления.

Вы не упомянули, как или если предметы когда-либо удаляются из этого контейнера. Если контейнер только растет, это упрощает то, как потоки могут сканировать его содержимое, не беспокоясь о конфликте со стороны других потоков, мутирующих контейнер. Каждый поток может начать свой запрос с уверенностью относительно количества минимум записей, доступных для проверки. Однако, если вы разрешите удаление предметов, вам придется столкнуться со многими другими проблемами.

2 голосов
/ 10 января 2010

В вашем методе добавления FunkyCollection вы можете вызывать notifyAll для самой коллекции каждый раз, когда добавляете элемент.

Если в методе get нижележащий контейнер (подойдет любой подходящий конкатайнер) не содержит нужного значения, подождите в FunkyCollection. Когда ожидание будет уведомлено, проверьте, содержит ли нижележащий контейнер нужный вам результат. Если это так, верните значение, иначе подождите снова.

1 голос
/ 10 января 2010

Похоже, вы ищете реализацию Tuple Spaces. В статье Википедии о них перечислены несколько реализаций Java, возможно, вы можете использовать одну из них. В противном случае вы можете найти реализацию с открытым исходным кодом для подражания или соответствующие исследовательские работы.

...