RabbitMQ: Открытие нескольких каналов, как уменьшить это? - PullRequest
0 голосов
/ 02 мая 2020

У меня есть базовый c класс планировщика, который продолжает читать БД для заданий и отправляет задание в RabbitMQ, если критерии удовлетворяют.

@Scheduled(fixedRate = 10000)
void enableScheduledJob(){
    List<ScheduledDeploymentJob> deploymentJobs = scheduledJobRepositoryCustom.findByStartTimeRunBetweenAndExecutedAndBoolActive(fromDate, toDate, false, true, DEPLOYMENT_JOB);
    List<ScheduledDeploymentJob> testingJobs = scheduledJobRepositoryCustom.findByStartTimeRunBetweenAndExecutedAndBoolActive(fromDate, toDate, false, true, TESTING_JOB);


    if(deploymentJobs != null && !deploymentJobs.isEmpty()){

        try {
            Properties queueProperties = rabbitMqSenderConfig.amqpAdmin().getQueueProperties(SCHEDULED_QUEUE_NAME);
            logger.info("queueProperties -> "+queueProperties);
            if(queueProperties == null) {
                Queue queue = new Queue(SCHEDULED_QUEUE_NAME, true);
                rabbitMqSenderConfig.amqpAdmin().declareQueue(queue);
                rabbitMqSenderConfig.amqpAdmin().declareExchange(new DirectExchange(SCHEDULED_JOB_EXCHANGE));
                rabbitMqSenderConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange(SCHEDULED_JOB_EXCHANGE)).withQueueName());
            }
            if(queueProperties != null) {
                ScheduledRabbitMQConsumer container = new ScheduledRabbitMQConsumer();
                container.setConnectionFactory(rabbitMqSenderConfig.connectionFactory());
                container.setQueueNames(SCHEDULED_QUEUE_NAME);
                container.setMessageListener(new MessageListenerAdapter(
                        new ScheduledRabbitMQHandler(scheduledJobRepositoryCustom,
                                sfdcConnectionDetailsMongoRepository, scheduledDeploymentMongoRepository,
                                scheduledLinkedServicesMongoRepository),
                        new Jackson2JsonMessageConverter()));
                logger.info("Started Consumer called from saveSfdcConnectionDetails");
                container.startConsumers();
                for (ScheduledDeploymentJob scheduledDeploymentJob : deploymentJobs) {
                    logger.info("Send to RabbitMQ -> " + scheduledDeploymentJob.getGitRepoId());
                    rabbitTemplateCustomAdmin.convertAndSend(SCHEDULED_JOB_EXCHANGE, SCHEDULED_QUEUE_NAME, scheduledDeploymentJob);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

RabbitMQ Config class:

@EnableRabbit
@Configuration
public class RabbitMqSenderConfig {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqSenderConfig.class);

    @Value("${spring.rabbitmq.addresses}")
    private String addressURL;


    @Bean
    public ConnectionFactory connectionFactory() throws URISyntaxException {
        return new CachingConnectionFactory(new URI(addressURL));
    }

    /**
     * Required for executing administration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() throws URISyntaxException {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() throws URISyntaxException {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }


}

Код обработчика:

public class ScheduledRabbitMQHandler {

    public void handleMessage(ScheduledDeploymentJob scheduledDeploymentJob) {
     //processing here
    }
}

В соответствии с передовой практикой мы показываем одно длительное соединение вместо создания соединений, а также часто закрываем и открываем канал внутри этого соединения. Но когда это вызывается: ScheduledRabbitMQConsumer container = new ScheduledRabbitMQConsumer(); всегда создает новый канал.

Кроме того, поскольку я использую собственный обработчик, как я могу убедиться, что каналы закрываются после обработки сообщения, чтобы не было нежелательные каналы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...