Я пытаюсь создать очередь RPC, в которой сообщения от производителя / клиента имеют приоритет перед потребителем.Я хочу, чтобы все сообщения с приоритетом 2 обрабатывались до любого сообщения с приоритетом 1.Я также хочу, чтобы каждый потребитель мог обрабатывать 10 сообщений одновременно.Я могу заставить каждого потребителя обрабатывать 10 сообщений одновременно.Я НЕ могу установить приоритеты сообщений для работы.Ниже моя установка:
Файлы конфигурации:
@Configuration
public class QueueConfig {
public static final String QUEUE_NAME = "requests";
private int maxPriority = 2;
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public Queue requests() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", maxPriority);
return new Queue(QUEUE_NAME,true,false,false, args);
}
@Bean
public Queue replies() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", maxPriority);
return new Queue("replies",true,false,false, args);
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(replies().getName());
return container;
}
@Bean
public RabbitTemplate template() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setRoutingKey(requests().getName());
return rabbitTemplate;
}
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer container) {
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate, container);
asyncRabbitTemplate.setReceiveTimeout(90000);
return asyncRabbitTemplate;
}
}
Клиент / Производитель:
@Component
public class Client {
@Autowired
private AsyncRabbitTemplate template;
public void sendHigh(String name) {
MessagePostProcessor messageProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(2);
return message;
}
};
ListenableFuture<String> response = template.convertSendAndReceive(QueueConfig.QUEUE_NAME, (Object) name,(MessagePostProcessor) messageProcessor);
try {
response.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public void sendLow(String name) {
MessagePostProcessor messageProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(1);
return message;
}
};
ListenableFuture<String> response = template.convertSendAndReceive(QueueConfig.QUEUE_NAME, (Object) name,(MessagePostProcessor) messageProcessor);
try {
response.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
файл конфигурации 2:
@Configuration
@EnableAsync
public class ServiceConfig implements AsyncConfigurer {
@Override
@Bean
public Executor getAsyncExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
Потребитель:
@Component
public class Consumer {
@RabbitListener(queues = QueueConfig.QUEUE_NAME)
public String consume(@Payload String name) {
System.out.println("Request Consumer " + name);
String result = name;
if(result.equals("john") || result.equals("john1")) {
try {
Thread.sleep(22000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return result;
}
Application.properties:
spring.rabbitmq.dynamic=true
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=localhost
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL
Junit:
Я бы ожидал, что все john и john1 будут получены потребителем до Jeff, но этоне то поведение, которое я вижу.В основном я смотрю на этот println и ожидаю, что все john и john1 будут печатать перед jeff System.out.println ("Request Consumer" + name);
@RunWith(SpringRunner.class)
@ComponentScan(basePackages = "com.test.test")
@EnableAutoConfiguration
@SpringBootTest
public class ApplicationTests {
@Autowired AsyncClass asyncClass;
@Test
public void contextLoads() throws InterruptedException, ExecutionException {
List<Future<String>> futures = new ArrayList<>();
futures.add(asyncClass.runAsyncHigh("john"));
futures.add(asyncClass.runAsyncHigh("john"));
futures.add(asyncClass.runAsyncHigh("john"));
futures.add(asyncClass.runAsyncHigh("john"));
futures.add(asyncClass.runAsyncHigh("john"));
futures.add(asyncClass.runAsyncHigh("john"));
futures.add(asyncClass.runAsyncHigh("john"));
futures.add(asyncClass.runAsyncHigh("john"));
futures.add(asyncClass.runAsyncHigh("john"));
futures.add(asyncClass.runAsyncHigh("john"));
Thread.sleep(1000);
futures.add(asyncClass.runAsyncLow("jeff"));
futures.add(asyncClass.runAsyncHigh("john1"));
futures.add(asyncClass.runAsyncHigh("john1"));
futures.add(asyncClass.runAsyncHigh("john1"));
futures.add(asyncClass.runAsyncHigh("john1"));
futures.add(asyncClass.runAsyncHigh("john1"));
futures.add(asyncClass.runAsyncHigh("john1"));
futures.add(asyncClass.runAsyncHigh("john1"));
futures.add(asyncClass.runAsyncHigh("john1"));
futures.add(asyncClass.runAsyncHigh("john1"));
futures.add(asyncClass.runAsyncHigh("john1"));
for(Future<String> future : futures) {
future.get();
}
}
@ Открытый класс компонента AsyncClass {
@Autowired Client client;
@Async
public Future<String> runAsyncHigh(String name){
client.sendHigh(name);
return new AsyncResult<String>(name);
}
@Async
public Future<String> runAsyncLow(String name){
client.sendLow(name);
return new AsyncResult<String>(name);
}
}
Спасибо, Брайан