Мотивация: мне нужно установить threadKey для DelegatingSessionFactory перед тем, как я направлюсь к исходящему шлюзу Sftp, и впоследствии отключить threadKey.
В зависимости от арендатора мне нужно использовать другую учетную запись пользователя Sftp.Учетные записи пользователей являются вопросом конфигурации в моем application.yml, я не хочу писать отдельные маршруты для каждого нового арендатора.
public IntegrationFlow aDynamicSftpFlow() {
f -> f
.handle(tenantSessionDefine()) // how can I use a lambda instead?
.handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
.handle(...) // undefine sftp session
}
Для настройки threadKey требуется Message<?>
, а не только полезная нагрузка и заголовки,Поэтому я использую бин, потому что он принимает сообщение:
public class TenantSessionDefine {
private DelegatingSessionFactory delegatingSessionFactory;
public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
this.delegatingSessionFactory = delegatingSessionFactory;
}
public Message<?> defineSession(Message<?> message) {
return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
.get("tenantId", String.class));
// used by SessionFactoryLocator
}
}
Я хотел бы написать это как лямбду, как в
.handle(message -> delegatingSessionFactory.setThreadKey(message,
message.getPayload().getTenant())
, но это не так просто.Лямбда, которую можно использовать с handle()
, которая принимает Message<T>
, завершает поток, потому что это пустая функция (MessageHandler
функциональный интерфейс).Другой лямбда - это GenericHandler, который не завершает поток, но требует полезной нагрузки и заголовков, а не сообщения.
Это всего лишь пример, время от времени я хотел бы использовать handle()
ссообщение в лямбде без остановки потока.Как я могу это сделать?
Обновление
DelegatingSessionFactory
не очень подходящий пример.Поскольку установка и очистка ключа потока должна происходить до и после вызова sftp, совет подходит лучше, чем определение обработчика до и после вызова.