Spring Integration DSL: лямбда для возврата сообщения <T>в методе handle, например с DelegatingSessionFactory? - PullRequest
0 голосов
/ 02 марта 2019

Мотивация: мне нужно установить threadKey для DelegatingSessionFactory перед тем, как я направлюсь к исходящему шлюзу Sftp, и впоследствии отключить threadKey.

В зависимости от арендатора мне нужно использовать другую учетную запись пользователя Sftp.Учетные записи пользователей являются вопросом конфигурации в моем application.yml, я не хочу писать отдельные маршруты для каждого нового арендатора.

public IntegrationFlow aDynamicSftpFlow() {
    f -> f
        .handle(tenantSessionDefine()) // how can I use a lambda instead?
        .handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
        .handle(...) // undefine sftp session
}

Для настройки threadKey требуется Message<?>, а не только полезная нагрузка и заголовки,Поэтому я использую бин, потому что он принимает сообщение:

public class TenantSessionDefine {


    private DelegatingSessionFactory delegatingSessionFactory;

    public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
        this.delegatingSessionFactory = delegatingSessionFactory;
    }

    public Message<?> defineSession(Message<?> message) {
        return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
            .get("tenantId", String.class)); 
// used by SessionFactoryLocator
    }
}

Я хотел бы написать это как лямбду, как в

.handle(message -> delegatingSessionFactory.setThreadKey(message, 
    message.getPayload().getTenant())

, но это не так просто.Лямбда, которую можно использовать с handle(), которая принимает Message<T>, завершает поток, потому что это пустая функция (MessageHandler функциональный интерфейс).Другой лямбда - это GenericHandler, который не завершает поток, но требует полезной нагрузки и заголовков, а не сообщения.

Это всего лишь пример, время от времени я хотел бы использовать handle() ссообщение в лямбде без остановки потока.Как я могу это сделать?

Обновление

DelegatingSessionFactory не очень подходящий пример.Поскольку установка и очистка ключа потока должна происходить до и после вызова sftp, совет подходит лучше, чем определение обработчика до и после вызова.

1 Ответ

0 голосов
/ 02 марта 2019

Понял.Javadoc для handle() говорит

Используйте handle(Class, GenericHandler), если вам нужен доступ ко всему сообщению.

Параметр Class должен быть Message.class:

.handle(Message.class, 
    (message, headers) -> sftpSessionFactory
        .setThreadKey(message, headers.get("tenantId")))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...