Дождитесь завершения любого из Future <T> - PullRequest
53 голосов
/ 23 сентября 2008

У меня работает несколько асинхронных задач, и мне нужно подождать, пока хотя бы одна из них будет завершена (в будущем, вероятно, мне придется подождать, пока утилита M из N будет завершена). В настоящее время они представлены как Future, поэтому мне нужно что-то вроде

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

Есть что-нибудь подобное? Или что-то подобное, не обязательно для будущего. В настоящее время я перебираю коллекцию фьючерсов, проверяю, завершено ли это, затем сплю некоторое время и проверяю снова. Это выглядит не лучшим решением, потому что, если я сплю в течение длительного периода времени, добавляется нежелательная задержка, если я сплю в течение короткого периода времени, это может повлиять на производительность.

Я мог бы попытаться использовать

new CountDownLatch(1)

и уменьшите обратный отсчет после завершения задачи и выполните

countdown.await()

, но я обнаружил, что это возможно, только если я буду контролировать создание в будущем. Это возможно, но требует редизайна системы, потому что в настоящее время логика создания задач (отправка Callable в ExecutorService) отделена от решения ждать, какое будущее. Я мог бы также переопределить

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

и создайте пользовательскую реализацию RunnableFuture с возможностью прикрепить прослушиватель, чтобы получать уведомления о завершении задачи, затем присоединить такого прослушивателя к необходимым задачам и использовать CountDownLatch, но это означает, что мне нужно переопределить newTaskFor для каждого используемого ExecutorService - и, возможно, будет реализация, которая не расширяет AbstractExecutorService. Я мог бы также попытаться обернуть данный ExecutorService для той же цели, но затем я должен украсить все методы, производящие Futures.

Все эти решения могут работать, но кажутся очень неестественными. Похоже, я упускаю что-то простое, как

WaitHandle.WaitAny(WaitHandle[] waitHandles)

в c #. Есть ли известные решения для такого рода проблем?

UPDATE:

Изначально у меня вообще не было доступа к созданию будущего, поэтому не было элегантного решения. После редизайна системы я получил доступ к созданию Future и смог добавить countDownLatch.countdown () в процесс выполнения, затем я могу считать countDownLatch.await (), и все работает отлично. Спасибо за другие ответы, я не знал о ExecutorCompletionService, и он действительно может быть полезен в подобных задачах, но в данном конкретном случае его нельзя использовать, потому что некоторые Futures создаются без какого-либо исполнителя - фактическая задача отправляется на другой сервер через сеть, завершается удаленно и уведомление о завершении получено.

Ответы [ 7 ]

55 голосов
/ 23 сентября 2008

просто, проверьте ExecutorCompletionService .

9 голосов
/ 23 сентября 2008
7 голосов
/ 23 сентября 2008

Почему бы просто не создать очередь результатов и ждать в очереди? Или, проще, используйте CompletionService, поскольку это то, что он есть: ExecutorService + очередь результатов.

6 голосов
/ 23 сентября 2008

На самом деле это довольно просто с wait () и notifyAll ().

Сначала определите объект блокировки. (Вы можете использовать любой класс для этого, но я хотел бы быть явным):

package com.javadude.sample;

public class Lock {}

Далее определите ваш рабочий поток. Он должен уведомить этот объект блокировки, когда он закончит свою обработку. Обратите внимание, что уведомление должно быть в синхронизированной блокировке блока объекта блокировки.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

Наконец, вы можете написать свой клиент:

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}
4 голосов
/ 23 сентября 2008

Насколько я знаю, Java не имеет структуры, аналогичной методу WaitHandle.WaitAny.

Мне кажется, что этого можно достичь с помощью декоратора "WaitableFuture":

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

Хотя это будет работать только в том случае, если его можно вставить перед кодом выполнения, поскольку в противном случае код выполнения не будет иметь нового метода doTask(). Но я действительно не вижу способа сделать это без опроса, если вы не можете каким-то образом получить контроль над объектом Future перед выполнением.

Или, если будущее всегда работает в своем собственном потоке, и вы можете каким-то образом получить этот поток. Затем вы можете создать новый поток для присоединения к другому потоку, а затем обработать механизм ожидания после того, как соединение вернется ... Это было бы очень уродливо и вызвало бы много накладных расходов. И если некоторые объекты Future не завершаются, у вас может быть много заблокированных потоков в зависимости от мертвых потоков. Если вы не будете осторожны, это может привести к утечке памяти и системных ресурсов.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}
0 голосов
/ 08 января 2009

См. Эту опцию:

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}
0 голосов
/ 23 сентября 2008

Так как вам все равно, какой из них заканчивается, почему бы просто не использовать один WaitHandle для всех потоков и ждать этого? Зависит от того, кто первым заканчивает, может установить ручку.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...