Spring AMQP RabbitMQ RPC Очередь с приоритетами - PullRequest
0 голосов
/ 10 декабря 2018

Я пытаюсь создать очередь RPC, в которой сообщения от производителя / клиента имеют приоритет перед потребителем.Я хочу, чтобы все сообщения с приоритетом 2 обрабатывались до любого сообщения с приоритетом 1.Я также хочу, чтобы каждый потребитель мог обрабатывать 10 сообщений одновременно.Я могу заставить каждого потребителя обрабатывать 10 сообщений одновременно.Я НЕ могу установить приоритеты сообщений для работы.Ниже моя установка:

Файлы конфигурации:

@Configuration
public class QueueConfig  { 
    public static final String QUEUE_NAME = "requests";

    private int maxPriority = 2;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public Queue requests() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-priority", maxPriority);
        return new Queue(QUEUE_NAME,true,false,false, args);
    }

    @Bean
    public Queue replies() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-priority", maxPriority);
        return new Queue("replies",true,false,false, args);
    }  

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(replies().getName());
        return container;
    }

    @Bean
    public RabbitTemplate template() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setRoutingKey(requests().getName());
        return rabbitTemplate;
    }

    @Bean
    public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer container) {
        AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate, container);
        asyncRabbitTemplate.setReceiveTimeout(90000);
        return asyncRabbitTemplate;
    }        
}

Клиент / Производитель:

@Component
public class Client {
    @Autowired
    private AsyncRabbitTemplate template;

    public void sendHigh(String name) {

        MessagePostProcessor messageProcessor = new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(2);
                return message;
            }
        };

        ListenableFuture<String> response = template.convertSendAndReceive(QueueConfig.QUEUE_NAME, (Object) name,(MessagePostProcessor) messageProcessor);
        try {
            response.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public void sendLow(String name) {

        MessagePostProcessor messageProcessor = new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(1);
                return message;
            }
        };

        ListenableFuture<String> response = template.convertSendAndReceive(QueueConfig.QUEUE_NAME, (Object) name,(MessagePostProcessor) messageProcessor);
        try {
            response.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

файл конфигурации 2:

@Configuration
@EnableAsync
public class ServiceConfig implements AsyncConfigurer {

    @Override
    @Bean
    public Executor getAsyncExecutor() {
        return new SimpleAsyncTaskExecutor();
    }


}

Потребитель:

@Component
public class Consumer  {

    @RabbitListener(queues = QueueConfig.QUEUE_NAME)
    public String consume(@Payload String name) {
        System.out.println("Request Consumer " + name);
        String result = name;
        if(result.equals("john") || result.equals("john1")) {
            try {
                Thread.sleep(22000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return result;
    }

Application.properties:

spring.rabbitmq.dynamic=true
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=localhost
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL

Junit:

Я бы ожидал, что все john и john1 будут получены потребителем до Jeff, но этоне то поведение, которое я вижу.В основном я смотрю на этот println и ожидаю, что все john и john1 будут печатать перед jeff System.out.println ("Request Consumer" + name);

@RunWith(SpringRunner.class)
@ComponentScan(basePackages = "com.test.test")
@EnableAutoConfiguration
@SpringBootTest
public class ApplicationTests {

    @Autowired AsyncClass asyncClass;

    @Test
    public void contextLoads() throws InterruptedException, ExecutionException {
        List<Future<String>> futures = new ArrayList<>();
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        Thread.sleep(1000);
        futures.add(asyncClass.runAsyncLow("jeff"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));

        for(Future<String> future : futures) {
            future.get();
        }


    }

@ Открытый класс компонента AsyncClass {

@Autowired Client client;

@Async
public Future<String> runAsyncHigh(String name){
    client.sendHigh(name);
    return new AsyncResult<String>(name);
}

@Async
public Future<String> runAsyncLow(String name){
    client.sendLow(name);
    return new AsyncResult<String>(name);
}

}

Спасибо, Брайан

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...