Как связать время вычислений в Java и получить все результаты, вычисленные до сих пор, даже когда бюджет времени заканчивается (тайм-аут)? - PullRequest
4 голосов
/ 15 февраля 2010

В рамках разработки, которую я разрабатываю, пользователь может выбрать выполнение некоторого трудоемкого задания в фоновом режиме и выполнение чего-то еще. Эта задача вычисляет ряд результатов. В какой-то момент, когда ему / ей нужны результаты фоновой задачи, допустимо подождать еще некоторое время до:

  • a) происходит тайм-аут (в этом случае пользователь хотел бы получить все результаты, вычисленные до сих пор, если они существуют); или

  • b) достигнуто максимальное число или вычисленные результаты (нормальное окончание),

, что произойдет раньше.

Гоча: Даже если время ожидания истекло, пользователь все еще хочет вычислить результаты до сих пор.

Я пытался сделать это, используя Future<V>.get(long timeout, TimeUnit unit) и Callable<V> класс, полученный, но случается, что когда возникает TimeoutException, это обычно означает задача была преждевременно завершена, поэтому никаких результатов нет . Поэтому мне пришлось добавить метод getPartialResults() (см. DiscoveryTask ниже), и я боюсь, что это использование слишком нелогично для потенциальных пользователей.

Вызов обнаружения:

public Set<ResourceId> discover(Integer max, long timeout, TimeUnit unit)
    throws DiscoveryException
{
    DiscoveryTask task = new DiscoveryTask(max);
    Future<Set<ResourceId>> future = taskExec.submit(task);
    doSomethingElse();
    try {
        return future.get(timeout, unit);
    } catch (CancellationException e) {
        LOG.debug("Discovery cancelled.", e);
    } catch (ExecutionException e) {
        throw new DiscoveryException("Discovery failed to execute.", e);
    } catch (InterruptedException e) {
        LOG.debug("Discovery interrupted.", e);
    } catch (TimeoutException e) {
        LOG.debug("Discovery time-out.");
    } catch (Exception e) {
        throw new DiscoveryException("Discovery failed unexpectedly.", e);
    } finally {
        // Harmless if task already completed
        future.cancel(true); // interrupt if running
    }
    return task.getPartialResults(); // Give me what you have so far!
}

Реализация открытия:

public class DiscoveryTask extends Callable<Set<ResourceId>> 
        implements DiscoveryListener
{
    private final DiscoveryService discoveryService;
    private final Set<ResourceId> results;
    private final CountDownLatch doneSignal;
    private final MaximumLimit counter;
    //...
    public DiscoveryTask(Integer maximum) {
        this.discoveryService = ...;
        this.results = Collections.synchronizedSet(new HashSet<ResourceId>());
        this.doneSignal = new CountDownLatch(1);
        this.counter = new MaximumLimit(maximum);
        //...
    }

    /**
     * Gets the partial results even if the task was canceled or timed-out.
     * 
     * @return The results discovered until now.
     */
    public Set<ResourceId> getPartialResults() {
        Set<ResourceId> partialResults = new HashSet<ResourceId>();
        synchronized (results) {
            partialResults.addAll(results);
        }
        return Collections.unmodifiableSet(partialResults);
    }

    public Set<ResourceId> call() throws Exception {
        try {
            discoveryService.addDiscoveryListener(this);
            discoveryService.getRemoteResources();
            // Wait...
            doneSignal.await();
        } catch (InterruptedException consumed) {
            LOG.debug("Discovery was interrupted.");
        } catch (Exception e) {
            throw new Exception(e);
        } finally {
            discoveryService.removeDiscoveryListener(this);
        }
        LOG.debug("Discovered {} resource(s).", results.size());
        return Collections.unmodifiableSet(results);
    }

    // DiscoveryListener interface
    @Override
    public void discoveryEvent(DiscoveryEvent de) {
        if (counter.wasLimitReached()) {
            LOG.debug("Ignored discovery event {}. "
                + "Maximum limit of wanted resources was reached.", de);
            return;
        }
        if (doneSignal.getCount() == 0) {
            LOG.debug("Ignored discovery event {}. "
                + "Discovery of resources was interrupted.", de);
            return;
        }
        addToResults(de.getResourceId());
    }

    private void addToResults(ResourceId id) {
        if (counter.incrementUntilLimitReached()) {
            results.add(id);
        } else {
            LOG.debug("Ignored resource {}. Maximum limit reached.",id);
            doneSignal.countDown();
        }
    }
}

В главе 6 книги Параллелизм Java на практике от Брайана Гетца и др. Авторы показывают решение для связанной проблемы, но в этом случае все результаты могут быть вычислены параллельно, что это не мой случай. Если быть точным, мои результаты зависят от внешних источников, поэтому я не могу контролировать, когда они появятся. Мой пользователь определяет желаемое максимальное количество результатов, которое он хочет, прежде чем вызывать выполнение задачи, и максимальный лимит времени, который она согласилась подождать после того, как будет готова получить результаты.

Это нормально для тебя? Вы бы сделали это по-другому? Есть ли лучший подход?

Ответы [ 3 ]

2 голосов
/ 15 февраля 2010

Передайте (более короткий) тайм-аут самой задаче и сделайте его преждевременным, когда он достигнет этого «мягкого тайм-аута». Тип результата может иметь флаг, указывающий, является ли результат идеальным или нет:

Future<Result> future = exec.submit(new Task(timeout*.9));
//if you get no result here then the task misbehaved, 
//i.e didn't obey the soft timeout. 
//This should be treated as a bug
Result result = future.get(timeout);
if (result.completedInTime()) {
    doSomethingWith(result.getData());        
} else {
    doSomethingElseWith(result.getData());       
}
0 голосов
/ 15 февраля 2010

Примерно так:

public static class Consumer<T> {

    private BlockingQueue<T> queue;
    private T lastElement;

    public Consumer(BlockingQueue<T> queue, T lastElement) {
        this.queue = queue;
        this.lastElement = lastElement;
    }

    public Collection<T> acquireResults(long timeout, TimeUnit timeoutTimeUnit) throws InterruptedException {

        LinkedList<T> result = new LinkedList<T>();
        queue.drainTo(result);

        if (result.getLast() == lastElement)
            return result;
        else
            result.add(queue.poll(timeout, timeoutTimeUnit));

        result.removeLast();

        return result;

    }

}

public static void main(String[] args) throws InterruptedException {

    String lastElement = "_lastElement";

    BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
    Consumer<String> consumer = new Consumer<String>(queue, lastElement);

    for (int i=0; i<100; i++)
        queue.put(UUID.randomUUID().toString());

    System.out.println(consumer.acquireResults(5, TimeUnit.SECONDS));

    queue.put("foo");
    queue.put("bar");

    queue.put(lastElement);

    System.out.println(consumer.acquireResults(5, TimeUnit.SECONDS));

}

Возможно, вы захотите использовать обертку для результата, чтобы указать, что какой-то результат действительно является последним, а не магическим значением.

0 голосов
/ 15 февраля 2010

Если отдельные результаты оказываются «дешевыми», тогда заставьте свою подпрограмму генерировать результаты и добавьте их в очередь вывода и посмотрите, сколько времени осталось, и сравните его со временем, которое заняли текущие результаты. Рассчитайте следующий результат, только если вы можете сделать это вовремя.

Это позволит вам оставаться однопоточным, что обычно делает ваш код проще и, следовательно, менее подверженным ошибкам.

...