Создать Exchange и очереди в брокере Qpid - PullRequest
0 голосов
/ 16 мая 2018

Привет. Я тестирую проект весенней интеграции с использованием встроенного брокера Qpid. Но проблема в том, КАК МОЖНО делать очереди и обмены в qpid. Я думал, что rabbit-config.xml сделает очереди и обмены в брокере qpid, но безрезультатно. Мой поток - создание очередей и обмен сообщениями в брокере qpid, и входящие адаптеры amqp, привязанные к этим очередям, будут получать сообщения, и я могу продолжить тест

Ошибка: очередь: «push.customer.arkona.controller.search» не найден в VirtualHost «по умолчанию».

    qpid-config.json:

    {   "name": "EmbeddedBroker",   "modelVersion": "2.0",   "storeVersion" : 1,   "authenticationproviders" : [ {
        "name" : "noPassword",
        "type" : "Anonymous",
        "secureOnlyMechanisms": []
            },
        {
          "name" : "passwordFile",
          "type" : "PlainPasswordFile",
          "path" : "${qpid.home_dir}${file.separator}src${file.separator}main${file.separator}resources${file.separator}password.properties",
          "secureOnlyMechanisms": []
        }    ],   "ports" : [
        {
          "name": "AMQP",
          "port": "${qpid.amqp_port}",
          "authenticationProvider": "passwordFile",
          "protocols": [
            "AMQP_0_10",
            "AMQP_0_8",
            "AMQP_0_9",
            "AMQP_0_9_1"
          ]
        }],
        "virtualhostnodes" : [ {
        "name" : "default",
        "type" : "JSON",
        "defaultVirtualHostNode" : "true",
        "virtualHostInitialConfiguration" : "${qpid.initial_config_virtualhost_config}",
         "storeType" : "DERBY"   
} 
] 
}

password.properties имеет

гость: гость

Я создал отдельный профиль для запуска моих тестов. это конфигурация rabbitmq. кроме этого у меня есть xml-файл контекста кролика, в котором определены все очереди, обмены.

@Configuration
@Profile("qpid")
public class QpidConfig {

    String amqpPort = "5672";

    //String qpidHomeDir = "complete";
    String configFileName = "src/main/resources/qpid-config.json";

    @Bean
    BrokerOptions brokerOptions() {

        File tmpFolder= Files.createTempDir();

        //small hack, because userDir is not same when running Application and ApplicationTest
        //it leads to some issue locating the files after, so hacking it here
        String userDir=System.getProperty("user.dir").toString();

        File file = new File(userDir);
        String homePath = file.getAbsolutePath();

        BrokerOptions brokerOptions=new BrokerOptions();

        brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.getAbsolutePath());
        brokerOptions.setConfigProperty("qpid.amqp_port",amqpPort);
        brokerOptions.setConfigProperty("qpid.home_dir", homePath);
        brokerOptions.setInitialConfigurationLocation(homePath + "/"+configFileName);

        return brokerOptions;
    }

    @SuppressWarnings("rawtypes")
    @Bean
    Broker broker() throws Exception {

            org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
        broker.startup(brokerOptions());
        return (Broker) broker;
    }

    private ConnectionFactory connectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        return factory;
    }

    @Bean(name ="rabbitConnectionFactory")
    public CachingConnectionFactory rabbitConnectionFactory(){
        return new CachingConnectionFactory(connectionFactory());
    }

    @Bean(name="rabbitTemplate")
    public RabbitTemplate rabbitTemplate(){
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean(name ="arkonaHeaderMapper")
    public DefaultAmqpHeaderMapper syncerHeaderMapper() {
        DefaultAmqpHeaderMapper amqpHeaderMapper = DefaultAmqpHeaderMapper.inboundMapper();
        amqpHeaderMapper.setRequestHeaderNames("*");
        amqpHeaderMapper.setReplyHeaderNames("*");
        return amqpHeaderMapper;
    }


}

EDIT

МОЙ Кролик-context.xml

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:queue name="pull.appt.arkona.scheduler.adapter" />
    <rabbit:queue name="pull.appt.arkona.adapter.processor" />

    <rabbit:queue name="pull.customer.arkona.to.lookup" />
    <rabbit:queue name="pull.customer.arkona.lookup.processor" />

    <rabbit:queue name="pull.customer.arkona.scheduler.adapter" />

    <rabbit:queue name="pull.ro.arkona.to.lookup" />
    <rabbit:queue name="pull.ro.arkona.adapter.processor" />

    <rabbit:queue name="pull.ro.arkona.scheduler.adapter" />

    <rabbit:queue name="pull.closed.arkona.scheduler.adapter" />
    <rabbit:queue name="pull.parts.arkona.scheduler.adapter" />

    <rabbit:queue name="pull.closed.arkona.adapter.processor" />
    <rabbit:queue name="pull.parts.arkona.adapter.processor" />

    <rabbit:queue name="pull.vehicle.arkona.to.lookup" />
    <rabbit:queue name="pull.vehicle.arkona.lookup.processor" />

    <rabbit:direct-exchange name="dms.arkona.exchange" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="pull.appt.arkona.scheduler.adapter" key="pull.appt.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.appt.arkona.adapter.processor" key="pull.appt.arkona.adapter.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.customer.arkona.to.lookup" key="pull.customer.arkona.to.lookup.key"></rabbit:binding>
            <rabbit:binding queue="pull.customer.arkona.lookup.processor" key="pull.customer.arkona.lookup.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.customer.arkona.scheduler.adapter" key="pull.customer.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.ro.arkona.to.lookup" key="pull.ro.arkona.to.lookup.key"></rabbit:binding>
            <rabbit:binding queue="pull.ro.arkona.adapter.processor" key="pull.ro.arkona.adapter.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.ro.arkona.scheduler.adapter" key="pull.ro.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.vehicle.arkona.to.lookup" key="pull.vehicle.arkona.to.lookup.key"></rabbit:binding>
            <rabbit:binding queue="pull.vehicle.arkona.lookup.processor" key="pull.vehicle.arkona.lookup.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.closed.arkona.scheduler.adapter" key="pull.closed.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.closed.arkona.adapter.processor" key="pull.closed.arkona.adapter.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.parts.arkona.scheduler.adapter" key="pull.parts.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.parts.arkona.adapter.processor" key="pull.parts.arkona.adapter.processor.key"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>

1 Ответ

0 голосов
/ 16 мая 2018

Есть ли у вас RabbitAdmin в контексте вашего приложения? (Он обнаруживает очереди / обмены / привязки и объявляет их при установлении соединения).

Я только что проверил Образец Spring Integration AMQP с QPID 6.1.2, и все было в порядке ...

<!-- Infrastructure -->

<rabbit:connection-factory id="connectionFactory" host="xx.xx.xx.xx" virtual-host="default" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="si.test.queue" />

<rabbit:direct-exchange name="si.test.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="si.test.queue" key="si.test.binding" />
    </rabbit:bindings>
</rabbit:direct-exchange>

EDIT

У меня тоже нормально работает загрузочное приложение ...

@SpringBootApplication
public class So50364236Application {

    public static void main(String[] args) {
        SpringApplication.run(So50364236Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> template.convertAndSend("so50364236", "foo");
    }

    @Bean
    public Queue queue() {
        return new Queue("so50364236");
    }

    @RabbitListener(queues = "so50364236")
    public void listen(String in) {
        System.out.println(in);
    }

}

и

spring.rabbitmq.addresses=xx.x.x.x
spring.rabbitmq.virtual-host=default

и

2018-05-16 13:17:25.013  INFO 34714 --- [           main] com.example.So50364236Application        : Started So50364236Application in 1.151 seconds (JVM running for 1.579)
foo

И я вижу очередь на странице администратора брокера.

EDIT2

Вот еще одно загрузочное приложение, в котором очередь объявлена ​​в XML-файле; используя встроенный QPID 6.1.6 ...

qpid-config.json

{
    "name": "EmbeddedBroker",
    "modelVersion": "2.0",
    "storeVersion": 1,
    "authenticationproviders": [
        {
            "name": "noPassword",
            "type": "Anonymous",
            "secureOnlyMechanisms": []
        },
        {
            "name": "passwordFile",
            "type": "PlainPasswordFile",
            "path": "${qpid.home_dir}${file.separator}etc${file.separator}passwd",
            "secureOnlyMechanisms": []
        }
    ],
    "ports": [
        {
            "name": "AMQP",
            "port": "${qpid.amqp_port}",
            "authenticationProvider": "passwordFile",
            "protocols": [
                "AMQP_0_10",
                "AMQP_0_8",
                "AMQP_0_9",
                "AMQP_0_9_1"
            ]
        }
    ],
    "virtualhostnodes": [
        {
            "name": "default",
            "type": "JSON",
            "defaultVirtualHostNode": "true",
            "virtualHostInitialConfiguration": "${qpid.initial_config_virtualhost_config}",
            "storeType": "DERBY"
        }
    ]
}

config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:queue name="so50364236b" />

</beans>

application.properties

spring.rabbitmq.addresses=localhost:8888

Загрузочное приложение

@SpringBootApplication
@ImportResource("config.xml")
public class So50364236Application {

    public static void main(String[] args) {
        new SpringApplicationBuilder(So50364236Application.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> template.convertAndSend("so50364236b", "foo");
    }

    @Bean
    BrokerOptions brokerOptions() throws Exception {

        Path tmpFolder = Files.createTempDirectory("qpidWork");
        Path homeFolder = Files.createTempDirectory("qpidHome");
        File etc = new File(homeFolder.toFile(), "etc");
        etc.mkdir();
        FileOutputStream fos = new FileOutputStream(new File(etc, "passwd"));
        fos.write("guest:guest\n".getBytes());
        fos.close();

        BrokerOptions brokerOptions = new BrokerOptions();

        brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
        brokerOptions.setConfigProperty("qpid.amqp_port", "8888");
        brokerOptions.setConfigProperty("qpid.home_dir", homeFolder.toAbsolutePath().toString());
        Resource config = new ClassPathResource("qpid-config.json");
        brokerOptions.setInitialConfigurationLocation(config.getFile().getAbsolutePath());

        return brokerOptions;
    }

    @Bean
    Broker broker() throws Exception {
        org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
        broker.startup(brokerOptions());
        return broker;
    }

    @RabbitListener(queues = "so50364236b")
    public void listen(String in) {
        System.out.println(in);
    }

}
* * И тысяча сорок-девять
[Broker] BRK-1004 : Qpid Broker Ready
received: foo

Возможно, вы делаете что-то, из-за чего администратор загрузки не объявляется. Непонятно, зачем вы добавляете свою собственную фабрику соединений и шаблон; Вы тоже пытались добавить свой RabbitAdmin?

...