У меня есть следующая задача: WSO2 прослушивает очередь RabbitMQ (amqp) для входящего сообщения, вызывает некоторую процедуру DB и отвечает обратно на указанную в очереди входящих сообщений (свойство reply_to) с указанным идентификатором Correlation (свойство correlation_id).Я использовал динамическую маршрутизацию сообщений с конечной точкой по умолчанию (названной RabbitMQ_DefEP) и CorrelationId. Я трачу некоторое время на сбор информации, чтобы сделать эту работу.Вот рабочий пример. Может быть полезным для других
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="RabbitMQRPCTest01" startOnLoad="true" transports="rabbitmq" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<log description="" level="custom">
<property name="*****Rabbit MQ Start" value="********************************* headers"/>
</log>
<log level="headers"/>
<log description="" level="custom">
<property name="*****Rabbit MQ" value="********************************* full"/>
</log>
<log level="full"/>
<log description="CorrId" level="custom">
<property expression="$header" name="***Header="/>
<property expression="$body" name="***Body="/>
</log>
<property description="Get Correlation Id" expression="get-property('transport', 'rabbitmq.message.correlation.id')" name="corrid" scope="default" type="STRING"/>
<property description="Get Reply To" expression="get-property('transport', 'RABBITMQ_REPLY_TO')" name="replto" scope="default" type="STRING"/>
<log level="custom">
<property expression="get-property('corrid')" name="***CORRELATION ID="/>
<property expression="get-property('replto')" name="***REPLY_TO="/>
</log>
<payloadFactory description="Set payload for procedure call" media-type="xml">
<format>
<p:CallPing xmlns:p="http://ws.wso2.org/dataservice">
<p:req>$1</p:req>
</p:CallPing>
</format>
<args>
<arg evaluator="xml" expression="$body//ns:text" xmlns:ns="http://ws.apache.org/commons/ns/payload"/>
</args>
</payloadFactory>
<property description="HTTP_METHOD" name="HTTP_METHOD POST" scope="axis2" type="STRING" value="POST"/>
<property description="SOAPAction" name="SOAPAction" scope="transport" type="STRING" value=""/>
<property description="messageType" name="messageType" scope="axis2" type="STRING" value="application/xml"/>
<log level="custom">
<property expression="$body" name="*****Procedure params"/>
</log>
<send>
<endpoint key="DMSB_CFTEP"/>
</send>
</inSequence>
<outSequence>
<log description="" level="custom">
<property name="*****Rabbit MQ Reply started" value="*********************************"/>
</log>
<log description="Full Log" level="full"/>
<header description="Build URI for AMQP" expression="fn:concat('rabbitmq:/AMQPProducerSample?rabbitmq.server.host.name=rabbitmq.ru&rabbitmq.server.port=5672&rabbitmq.server.user.name=monitor&rabbitmq.server.password=12345&rabbitmq.connection.factory=RabbitMQConnectionFactory&rabbitmq.exchange.name=&rabbitmq.queue.routing.key=', get-property('replto'))" name="To" scope="default"/>
<log description="" level="custom">
<property name="*****Rabbit MQ Reply END" value="*********************************"/>
</log>
<property description="Set Correlation Id" expression="get-property('corrid')" name="rabbitmq.message.correlation.id" scope="axis2" type="STRING"/>
<call>
<endpoint key="RabbitMQ_DefEP"/>
</call>
</outSequence>
<faultSequence>
<log level="custom">
<property expression="$body" name="****FAULT***************"/>
</log>
<log description="Log Props" level="full">
<property name="text" value="An unexpected error occured. Executing fault sequence"/>
<property expression="get-property('ERROR_MESSAGE')" name="ERROR_MESSAGE"/>
<property expression="get-property('ERROR_DETAIL')" name="detail"/>
<property expression="get-property('ERROR_EXCEPTION')" name="exception"/>
<property expression="get-property('ERROR_CODE')" name="ERROR_CODE"/>
</log>
</faultSequence>
</target>
<parameter name="rabbitmq.queue.name">esb</parameter>
<parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
</proxy>