Поиск параллельной коллекции Java - PullRequest
0 голосов
/ 30 августа 2011

Я некоторое время программировал на Java, но плохо знаком с параллельным программированием, так что терпите меня!

Я пытаюсь разработать класс, содержащий группу классов Collection [например, ArrayLists], а затемчтобы найти указанное значение, он обходит все коллекции одновременно, останавливая все потоки, если находит данное значение.

Я вставил свой код ниже, и мне было интересно, если кто-нибудь знает, как в пределах методаве_добавляется_добавление_коллекций ()если один из потоков вернул Фьючерс вернул true?

Спасибо

Грэм

public class CollectionGroup<V> extends ContainerGroup
{
//...
    public boolean contains(V value)
    {
        boolean containsValue = false;
        if (mCollections.size() == 1)
        {
            containsValue = mCollections.get(0).contains(value);
        }
        else
        {
            containsValue = contains_multiple_collections(value);
        }
        return containsValue;
    }

    private boolean contains_multiple_collections(V value)
    {
        // thread pool
        int numberProcessors = mCollections.size();
        ExecutorService es = Executors.newFixedThreadPool(numberProcessors);

        for (int i=0; i<numberProcessors; i++)
        {
            AbstractCollection<V> collection = mCollections.get(i);
            MyCallable callable = new MyCallable(collection,value);
            Future<Boolean> future = es.submit(callable);
            //...
        }

        return true;
    }

    private class MyCallable implements Callable<Boolean>
    {
        protected AbstractCollection<V> mCollection;
        protected V                     mValue;

        public MyCallable(AbstractCollection<V> collection, V value)
        {
            mCollection = collection;
            mValue      = value;
        }

        @Override
        public Boolean call() throws Exception
        {
            boolean ok = mCollection.contains(mValue);
            return ok;
        }
    } // class MyCallable

} // class CollectionGroup

Ответы [ 5 ]

2 голосов
/ 30 августа 2011

содержит не остановится только потому, что вы могли найти значение в другом потоке.Единственный способ сделать это - зацикливаться.

Для процесса, интенсивно использующего процессор, оптимальным числом потоков, вероятно, будет количество ядер, которое у вас есть.Создание слишком большого количества потоков добавляет накладных расходов, что замедляет решение.

Вы также должны помнить о закрытии () ExecutorService, когда закончите с ним.

Если вы хотите ускорить поиск,Я бы поддержал набор всех значений, это, вероятно, будет в 10-100 раз быстрее, чем при использовании нескольких потоков.

1 голос
/ 30 августа 2011

Вы можете использовать ExecutorCompletionService.Просто сохраняйте блоки take () (take () до тех пор, пока не появится готовое Future), пока не получите истинный результат и shutdownNow () базовый ExecturService, как только вы что-то найдете.Это не приведет к немедленной остановке всех потоков, как только будет найдено значение.

0 голосов
/ 31 августа 2011

Проблема в том, что ваш метод contains_multiple_collections не ожидает завершения поиска.У меня есть два варианта.Первый будет включать некоторую реализацию асинхронного обратного вызова, где метод содержит не блокирует и, возможно, принимает объект обратного вызова / слушателя в качестве аргумента.Второй заключается в том, чтобы создать блок метода метода, пока не будет определен результат.Ниже я описал пример реализации для последнего подхода, он не тестировался, поэтому будьте осторожны ...

/*
 * contains_multiple_collections now blocks until the desired 
 * value is located or all searches have completed unsuccessfully...
 */
private boolean contains_multiple_collections(V value) {
    // thread pool
    int numberProcessors = mCollections.size();
    ExecutorService es = Executors.newFixedThreadPool(numberProcessors);

    Object lock = new Object();
    AtomicBoolean outcome = new AtomicBoolean(false);
    AtomicInteger remainingSearches = new AtomicInteger(mCollections.size());

    for (int i = 0; i < numberProcessors; i++) {
        AbstractCollection<V> collection = mCollections.get(i);
        es.submit(new MyRunnable(collection, value, lock, outcome, remainingSearches));
    }

    /* Wait for searches to run. This thread will be notified when all searches
     * complete without successfully locating the value or as soon as the
     * desired value is found.
     */
    synchronized (lock) {
        while (!outcome.get() && remainingSearches.get() > 0) {
            try {
                lock.wait();
            } catch (InterruptedException ex) {
                // do something sensible.
            }
        }
        es.shutdownNow();
    }

    return outcome.get();
}

private class MyRunnable implements Runnable {

    final AbstractCollection<V> mCollection;
    final V                     mValue;
    final Object                lock;
    final AtomicBoolean         outcome;
    final AtomicInteger         remainingSearches;

    public MyRunnable(AbstractCollection<V> mCollection, V mValue, 
            Object lock, AtomicBoolean outcome, AtomicInteger remainingSearches) {
        this.mCollection = mCollection;
        this.mValue = mValue;
        this.lock = lock;
        this.outcome = outcome;
        this.remainingSearches = remainingSearches;
    }

    public void run() {
        boolean ok = mCollection.contains(mValue);
        if (ok || remainingSearches.decrementAndGet() == 0) {
            synchronized (lock) {
                if (ok) {
                    outcome.set(true);
                }

                lock.notify();
            }
        }
    }
}
0 голосов
/ 30 августа 2011

Я предлагаю использовать статический AtomicBoolean, который устанавливается при обнаружении совпадения. Каждый поток может затем проверить значение перед продолжением.

0 голосов
/ 30 августа 2011

Вы можете многократно перебирать все фьючерсы и опрашивать их с помощью get , используя ноль или почти нулевой тайм-аут, но, вероятно, лучшей идеей будет предоставить обратный вызов всем вашим MyCallable s, что должно затем позвоните, когда совпадение найдено. Обратный вызов должен затем отменить все другие задачи, возможно, выключив ExecutorService.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...