Как правильно синхронизировать / заблокировать при использовании CountDownLatch - PullRequest
4 голосов
/ 04 декабря 2010

Это сводится к тому, что один поток отправляет работу через какой-то сервис. Работа выполняется в некотором TPExecutor. После этого эта служба проверяет результаты и создает исключение в исходном потоке при определенных условиях (задание превышает максимальное количество повторных попыток и т. Д.). Фрагмент кода ниже примерно иллюстрирует этот сценарий в устаревшем коде:

import java.util.concurrent.CountDownLatch;

public class IncorrectLockingExample {

private static class Request {

    private final CountDownLatch latch = new CountDownLatch(1);

    private Throwable throwable;

    public void await() {
        try {
            latch.await();
        } catch (InterruptedException ignoredForDemoPurposes) {
        }
    }

    public void countDown() {
        latch.countDown();
    }

    public Throwable getThrowable() {
        return throwable;
    }

    public void setThrowable(Throwable throwable) {
        this.throwable = throwable;
    }

}

private static final Request wrapper = new Request();

public static void main(String[] args) throws InterruptedException {

    final Thread blockedThread = new Thread() {
        public void run() {
            wrapper.await();
            synchronized (wrapper) {
                if (wrapper.getThrowable() != null)
                    throw new RuntimeException(wrapper.getThrowable());
            }
        }
    };

    final Thread workingThread = new Thread() {
        public void run() {
            wrapper.setThrowable(new RuntimeException());
            wrapper.countDown();

        }
    };

    blockedThread.start();
    workingThread.start();

    blockedThread.join();
    workingThread.join();
}

}

Иногда (не воспроизводится на моем компьютере, но происходит на 16-ядерном сервере), исключение не передается в исходный поток. Я думаю, это потому, что не происходит принудительное выполнение before-before (например, 'countDown' происходит до 'setThrowable') и программа продолжает работать (но должна завершиться ошибкой). Буду признателен за любую помощь о том, как решить это дело. Ограничения: выпуск через неделю, требуется минимальное влияние на существующую кодовую базу.

Ответы [ 2 ]

6 голосов
/ 04 декабря 2010

Приведенный выше код (как сейчас обновленный) должен работать так, как вы ожидаете, без использования дополнительных механизмов синхронизации. Барьер памяти и соответствующие ему отношения 'случается до' устанавливаются с помощью методов CountDownLatch await() и countdown().

Из документов API :

Действия до «освобождения» методов синхронизатора, таких как Lock.unlock, Semaphore.release и CountDownLatch.countDown, выполняемых до действия после успешного метода «получения», такого как Lock.lock, Semaphore.acquire, Condition.await и CountDownLatch.await для того же объекта синхронизатора в другом потоке.

Если вы имеете дело с параллелизмом на регулярной основе, получите себе копию 'Java Concurrency in Practice' , это библия Java-параллелизма и будет хорошо стоить на вашей книжной полке :-).

2 голосов
/ 04 декабря 2010

Я подозреваю, что вам нужно

private volatile Throwable throwable

Вы пытались использовать ExecutorService, поскольку он встроен, и делает это для вас. Следующие отпечатки

future1 := result
future2  threw java.lang.IllegalStateException
future3  timed out

Код

public static void main(String... args)  {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> future1 = executor.submit(new Callable<String>() {
        public String call() throws Exception {
            return "result";
        }
    });

    Future<String> future2 = executor.submit(new Callable<String>() {
        public String call() throws Exception {
            throw new IllegalStateException();
        }
    });

    Future<String> future3 = executor.submit(new Callable<String>() {
        public String call() throws Exception {
            Thread.sleep(2000);
            throw new AssertionError();
        }
    });

    printResult("future1", future1);
    printResult("future2", future2);
    printResult("future3", future3);
    executor.shutdown();
}

private static void printResult(String description, Future<String> future) {
    try {
        System.out.println(description+" := "+future.get(1, TimeUnit.SECONDS));
    } catch (InterruptedException e) {
        System.out.println(description+"  interrupted");
    } catch (ExecutionException e) {
        System.out.println(description+"  threw "+e.getCause());
    } catch (TimeoutException e) {
        System.out.println(description+"  timed out");
    }
}

В коде для FutureTask есть комментарий.

/**
 * The thread running task. When nulled after set/cancel, this
 * indicates that the results are accessible.  Must be
 * volatile, to ensure visibility upon completion.
 */

Если вы не собираетесь повторно использовать код в JDK, все же стоит прочитать его, чтобы вы могли ознакомиться с любыми приемами, которые они используют.

...