У меня есть базовый c класс планировщика, который продолжает читать БД для заданий и отправляет задание в RabbitMQ, если критерии удовлетворяют.
@Scheduled(fixedRate = 10000)
void enableScheduledJob(){
List<ScheduledDeploymentJob> deploymentJobs = scheduledJobRepositoryCustom.findByStartTimeRunBetweenAndExecutedAndBoolActive(fromDate, toDate, false, true, DEPLOYMENT_JOB);
List<ScheduledDeploymentJob> testingJobs = scheduledJobRepositoryCustom.findByStartTimeRunBetweenAndExecutedAndBoolActive(fromDate, toDate, false, true, TESTING_JOB);
if(deploymentJobs != null && !deploymentJobs.isEmpty()){
try {
Properties queueProperties = rabbitMqSenderConfig.amqpAdmin().getQueueProperties(SCHEDULED_QUEUE_NAME);
logger.info("queueProperties -> "+queueProperties);
if(queueProperties == null) {
Queue queue = new Queue(SCHEDULED_QUEUE_NAME, true);
rabbitMqSenderConfig.amqpAdmin().declareQueue(queue);
rabbitMqSenderConfig.amqpAdmin().declareExchange(new DirectExchange(SCHEDULED_JOB_EXCHANGE));
rabbitMqSenderConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange(SCHEDULED_JOB_EXCHANGE)).withQueueName());
}
if(queueProperties != null) {
ScheduledRabbitMQConsumer container = new ScheduledRabbitMQConsumer();
container.setConnectionFactory(rabbitMqSenderConfig.connectionFactory());
container.setQueueNames(SCHEDULED_QUEUE_NAME);
container.setMessageListener(new MessageListenerAdapter(
new ScheduledRabbitMQHandler(scheduledJobRepositoryCustom,
sfdcConnectionDetailsMongoRepository, scheduledDeploymentMongoRepository,
scheduledLinkedServicesMongoRepository),
new Jackson2JsonMessageConverter()));
logger.info("Started Consumer called from saveSfdcConnectionDetails");
container.startConsumers();
for (ScheduledDeploymentJob scheduledDeploymentJob : deploymentJobs) {
logger.info("Send to RabbitMQ -> " + scheduledDeploymentJob.getGitRepoId());
rabbitTemplateCustomAdmin.convertAndSend(SCHEDULED_JOB_EXCHANGE, SCHEDULED_QUEUE_NAME, scheduledDeploymentJob);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
RabbitMQ Config class:
@EnableRabbit
@Configuration
public class RabbitMqSenderConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqSenderConfig.class);
@Value("${spring.rabbitmq.addresses}")
private String addressURL;
@Bean
public ConnectionFactory connectionFactory() throws URISyntaxException {
return new CachingConnectionFactory(new URI(addressURL));
}
/**
* Required for executing administration functions against an AMQP Broker
*/
@Bean
public AmqpAdmin amqpAdmin() throws URISyntaxException {
return new RabbitAdmin(connectionFactory());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate() throws URISyntaxException {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
Код обработчика:
public class ScheduledRabbitMQHandler {
public void handleMessage(ScheduledDeploymentJob scheduledDeploymentJob) {
//processing here
}
}
В соответствии с передовой практикой мы показываем одно длительное соединение вместо создания соединений, а также часто закрываем и открываем канал внутри этого соединения. Но когда это вызывается: ScheduledRabbitMQConsumer container = new ScheduledRabbitMQConsumer();
всегда создает новый канал.
Кроме того, поскольку я использую собственный обработчик, как я могу убедиться, что каналы закрываются после обработки сообщения, чтобы не было нежелательные каналы.