Изменение асинхронных программ на блокирующие является более общим требованием к этой проблеме.
В Java мы можем сделать это с помощью CountDownLatch
(а также Phaser
) или LockSupport + Atomic
.
Например, если требуется изменить асинхронный вызов asyncDoSomethingAwesome(param, callback)
на блокирующий, мы могли бы написать метод «обертки», например:
ResultType doSomethingAwesome(ParamType param) {
AtomicReference<ResultType> resultContainer = new AtomicReference<>();
Thread callingThread = Thread.currentThread();
asyncDoSomethingAwesome(param, result -> {
resultContainer.set(result);
LockSupport.unpark(callingThread);
});
ResultType result;
while ((result = resultContainer.get()) == null) {
LockSupport.park();
}
return result;
}
Я думаю, что это будетдостаточно, чтобы решить вашу проблему.Однако, когда мы пишем блокирующие программы, мы обычно хотим, чтобы «тайм-аут» поддерживал стабильность системы, даже если базовый интерфейс не работает должным образом, например:
ResultType doSomethingAwesome(ParamType param, Duration timeout) throws TimeoutException {
AtomicReference<ResultType> resultContainer = new AtomicReference<>();
Thread callingThread = Thread.currentThread();
asyncDoSomethingAwesome(param, result -> {
resultContainer.set(result);
LockSupport.unpark(callingThread);
});
ResultType result;
long deadline = Instant.now().plus(timeout).toEpochMilli();
while ((result = resultContainer.get()) == null) {
if (System.currentTimeMillis() >= deadline) {
throw new TimeoutException();
}
LockSupport.parkUntil(deadline);
}
return result;
}
Иногда нам нужно более усовершенствованное управление, чтобысигнал между потоками, особенно при написании библиотек параллелизма.Например, когда нам нужно узнать, получил ли блокирующий поток сигнал от другого потока, вызывающего LockSupport.unpark
, или успешно ли этот поток уведомил блокирующий поток, это обычно нелегко реализовать с помощью стандартной библиотеки Java.Поэтому я разработал еще одну библиотеку с более полным механизмом для решения этой проблемы: https://github.com/wmx16835/experimental_java_common/blob/master/alpha/src/main/java/mingxin/wang/common/concurrent/DisposableBlocker.java
При поддержке DisposableBlocker
жизнь станет намного проще:)
ResultType doSomethingAwesome(ParamType param, Duration timeout) throws TimeoutException {
// We can use org.apache.commons.lang3.mutable.MutableObject instead of AtomicReference,
// because this object will never be accessed concurrently
MutableObject<ResultType> resultContainer = new MutableObject<>();
DisposableBlocker blocker = new DisposableBlocker();
asyncDoSomethingAwesome(param, result -> {
resultContainer.setValue(result);
blocker.unblock();
});
if (!blocker.blockFor(timeout)) {
throw new TimeoutException();
}
return resultContainer.getValue();
}