Авторизация OAuth2 с помощью Spring Security и Rabbitmq - PullRequest
0 голосов
/ 15 мая 2018

В настоящее время у нас есть ряд микросервисов Spring, которые обмениваются данными с конечными точками REST и очередями RabbitMQ. Мы только что внедрили безопасность OAuth2 во всех сервисах, и конечные точки REST должным образом защищены.

У нас есть библиотека, которую мы написали, которая создает bean-компоненты RabbitTemplate и AmqpAdmin, так что шаблонный код не должен выполняться в каждом сервисе. Весной мы подключаемся к серверу RabbitMQ с определенным пользователем для постоянных клиентов и другим для администраторов. Мы не хотим подключаться к серверу RabbitMQ как отдельный пользователь.

Возможно ли, если мы передадим токен доступа в заголовке сообщения кролика, настроить шаблон RabbitTemplate для проверки токена до обработки сообщения? Это то, что можно / нужно сделать в процессорах AfterReceive / BeforePublish глобально для шаблона? Или это нужно проверять индивидуально в каждом методе слушателя?

Спасибо

1 Ответ

0 голосов
/ 23 мая 2018

Мне удалось выработать решение, создав пользовательские MessageListenerContainerFactory и MessageListenerContainer.

CustomMessageListenerContainerFactory.java:


public class CustomMessageListenerContainerFactory extends AbstractRabbitListenerContainerFactory {

    DefaultTokenServices tokenServices;

    public CustomMessageListenerContainerFactory(DefaultTokenServices tokenServices) {
        this.tokenServices = tokenServices;
    }

    /**
     * Create an empty container instance.
     *
     * @return the new container instance.
     */
    @Override
    protected CustomMessageListenerContainer createContainerInstance() {
        return new CustomMessageListenerContainer(tokenServices);
    }
}

CustomMessageListenerContainer.java:


public class CustomMessageListenerContainer extends SimpleMessageListenerContainer {
    private final static String errorMessage = "No valid credentials found in request: {}";
    private final static String handlingMessage = "Handling queue: {}";
    private final static String receivedMessage = "Received Message: {}";
    private final static Logger logger = LoggerFactory.getLogger(CustomMessageListenerContainer.class);


    private DefaultTokenServices tokenServices;

    /**
     * Constructor
     *
     * @param tokenServices The instance of DefaultTokenServices used to decrypt the access token.
     */
    public CustomMessageListenerContainer(DefaultTokenServices tokenServices) {
        this.tokenServices = tokenServices;
    }

    /**
     * This method checks to see if there is a valid authorization
     *
     * @param channel   The AMQP channel on which the message was published.
     * @param messageIn The incoming message.
     * @throws Exception Throws an exception when there are no valid credentials in the message.
     */
    @Override
    protected void executeListener(Channel channel, Message messageIn) throws Exception {
        logger.info(handlingMessage, (Object[]) getQueueNames());
        logger.info(receivedMessage, BeanUtils.beanProperties(messageIn));
        if (messageIn.getMessageProperties().getHeaders().keySet().stream().anyMatch(t -> Objects.equals(t.toLowerCase(), "authorization"))) {
            String accessKey = messageIn.getMessageProperties()
                    .getHeaders()
                    .keySet()
                    .stream()
                    .filter(t -> Objects.equals(t.toLowerCase(), "authorization"))
                    .findFirst()
                    .get();
            OAuth2Authentication auth = tokenServices.loadAuthentication(messageIn.getMessageProperties().getHeaders().get(accessKey).toString());
            // If the token is expired, there will be no auth.
            if (auth != null) {
                SecurityContextHolder.getContext().setAuthentication(auth);
                super.executeListener(channel, messageIn);
                return;
            }
        }
        rejectMessage(channel, messageIn);
    }

    private void rejectMessage(Channel channel, Message messageIn) throws Exception {
        logger.info(errorMessage, (Object[]) getQueueNames());
        String localMessage = errorMessage.replace("{}", String.join(", ", getQueueNames()));
        if (messageIn.getMessageProperties().getReplyTo() != null) {
            channel.basicPublish("",
                    messageIn.getMessageProperties().getReplyTo(),
                    new AMQP.BasicProperties.Builder()
                            .contentType("application/json")
                            .correlationId(messageIn.getMessageProperties().getCorrelationId())
                            .build(),
                    "{\"errorMessage\":\"".concat(localMessage).concat("\"}").getBytes());
        }
        throw new AmqpRejectAndDontRequeueException(localMessage);
    }
}

CustomRabbitListenerConfigurer.java: <pre> ... @Override public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); CustomMessageListenerContainerfactory factory = new CustomMessageListenerContainerfactory(tokenServices); ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class); factory.setConnectionFactory(connectionFactory); registrar.setContainerFactory(factory); } ...

...