Вызов асинхронных методов (Vert.x, Java) из обязательно синхронных - PullRequest
2 голосов
/ 20 марта 2019

У нас есть набор Java-приложений, которые изначально были написаны с использованием обычных синхронных методов, но в основном были преобразованы в асинхронный Vert.x (обычный API, а не Rx) везде, где это имеет смысл. У нас возникают некоторые проблемы на границах между синхронизацией и асинхронным кодом, особенно когда у нас есть метод, который должен быть синхронным (рассуждения объяснены ниже), и мы хотим вызвать из него асинхронный метод.

Есть много похожих вопросов, заданных ранее о переполнении стека, но практически все они находятся в контексте C #, и ответы, похоже, не применяются.

Среди прочего мы используем Geotools и Apache Shiro. Оба обеспечивают настройку через расширение с использованием определенных ими API, которые являются строго синхронными. В качестве конкретного примера, нашей пользовательской области авторизации для Shiro необходим доступ к нашему хранилищу пользовательских данных, для которого мы создали асинхронный DAO API. Метод Широ, который мы должны написать, называется doGetAuthorizationInfo;, ожидается, что он вернет AuthorizationInfo. Но, похоже, нет надежного способа доступа к данным авторизации с другой стороны асинхронного API DAO.

В конкретном случае, когда поток не был создан Vert.x, использование CompletableFuture является работоспособным решением: синхронный doGetAuthorizationInfo перенесет асинхронную работу в поток Vert.x, а затем заблокирует текущий нить в CompletableFuture.get(), пока результат не станет доступным.

К сожалению, метод Shiro (или Geotools, или любой другой) может быть вызван в потоке Vert.x. В этом случае крайне плохо блокировать текущий поток: если это поток цикла событий, то мы нарушаем золотое правило, а если это рабочий поток (скажем, через Vertx.executeBlocking), то его блокирование не позволит работнику получение чего-либо еще из своей очереди - это означает, что блокировка будет постоянной.

Есть ли "стандартное" решение этой проблемы? Мне кажется, что он будет появляться всякий раз, когда Vert.x используется в расширяемой синхронной библиотеке. Это просто ситуация, которой люди избегают?

EDIT

... с немного большей детализацией. Вот фрагмент из org.apache.shiro.realm.AuthorizingRealm:

/**
 * Retrieves the AuthorizationInfo for the given principals from the underlying data store.  When returning
 * an instance from this method, you might want to consider using an instance of
 * {@link org.apache.shiro.authz.SimpleAuthorizationInfo SimpleAuthorizationInfo}, as it is suitable in most cases.
 *
 * @param principals the primary identifying principals of the AuthorizationInfo that should be retrieved.
 * @return the AuthorizationInfo associated with this principals.
 * @see org.apache.shiro.authz.SimpleAuthorizationInfo
 */
protected abstract AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals);

Наш уровень доступа к данным имеет такие методы:

void loadUserAccount(String id, Handler<AsyncResult<UserAccount>> handler);

Как мы можем призвать последнего из первого? Если бы мы знали, что doGetAuthorizationInfo вызывается в потоке, отличном от Vert.x, мы могли бы сделать что-то вроде этого:

@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
    CompletableFuture<UserAccount> completable = new CompletableFuture<>();
    vertx.<UserAccount>executeBlocking(vertxFuture -> {
        loadUserAccount((String) principals.getPrimaryPrincipal(), vertxFuture);
    }, res -> {
        if (res.failed()) {
            completable.completeExceptionally(res.cause());
        } else {
            completable.complete(res.result());
        }
    });

    // Block until the Vert.x worker thread provides its result.
    UserAccount ua = completable.get();

    // Build up authorization info from the user account
    return new SimpleAuthorizationInfo(/* etc...*/);
}

Но если doGetAuthorizationInfo вызывается в потоке Vert.x, тогда все совершенно иначе. Приведенный выше трюк заблокирует поток цикла событий, так что это не нужно. Или, если это рабочий поток, то вызов executeBlocking поместит задачу loadUserAccount в очередь для этого же рабочего (я полагаю), поэтому последующий completable.get() будет блокироваться навсегда.

1 Ответ

3 голосов
/ 20 марта 2019

Могу поспорить, что вы уже знаете ответ, но хотели бы, чтобы это было не так - если вызов GeoTools или Shiro нужно будет заблокировать в ожидании ответа от чего-либо, вам не следует делать этот вызов наПоток Vert.x.

Вы должны создать ExecutorService с пулом потоков, который вы должны использовать для выполнения этих вызовов, и организовывать для каждой переданной задачи отправку сообщения Vert.x, когда это будет сделано.

У вас может быть некоторая гибкость в отношении размера чанков, которые вы перемещаете в пул потоков.Вместо того, чтобы плотно обернуть эти вызовы, вы можете переместить что-то большее выше в стек вызовов.Вы, вероятно, примете это решение, основываясь на том, сколько кода вам придется изменить.Поскольку создание асинхронного метода обычно подразумевает изменение всех синхронных методов в его стеке вызовов в любом случае (что является печальной фундаментальной проблемой для такого рода асинхронной модели), вы, вероятно, захотите сделать это высоко в стеке.

Возможно, в конечном итоге вы получите слой адаптера, который предоставляет API-интерфейсы Vert.x для различных синхронных служб.

...