Мой вариант использования:
- подписаться на Q1 и читать сообщения в пакетах указанного размера.
- Передать коллекцию прочитанных сообщений для обработки.
- опубликовать собранные сообщения в Q2 и подтвердить сообщение в Q1 после успешного подтверждения публикации q2.
код
@Component
public class EPPQ2Subscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(EPPQ2Subscriber.class);
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired
AppConfig appConfig;
List<Message> messageList = new ArrayList<Message>();
List<Long> diliveryTag = new ArrayList<Long>();
/**
* Method is listener's receive message method , invoked when there is message ready to read
* @param message - Domain object of message encapsulated
* @param channel - rabitmq client channel
* @param messageId - @TODO Delete it later.
* @param messageProperties - amqp message properties contains message properties such as delivery tag etc..
*/
@RabbitListener(id="messageListener",queues = "#{rabbitMqConfig.getSubscriberQueueName()}",containerFactory="queueListenerContainer")
public void receiveMessage(Message message, Channel channel, @Header("id") String messageId,
MessageProperties messageProperties) {
LOGGER.info("Result:" + message.getClass() + ":" + message.toString());
if(messageList.size() <= appConfig.getSubscriberChunkSize() ) {
messageList.add(message);
diliveryTag.add(messageProperties.getDeliveryTag());
} else {
// call the service here to decrypt, read pan, call danger to scrub, encrypt pan and re-pack them in message again.
//after this branch messageList should have scrubbed and encrypted message objects ready to publish.
// Here is call for publish and ack messages..
}
}
}
@Component
@Configuration
public class TopicConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicConfiguration.class);
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired EPPQ2Publisher eppQ2Publisher;
/**
* Caching connection factory
* @return CachingConnectionFactory
*/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqConfig.getPublisherHosts(), rabbitMqConfig.getPublisherPort());
connectionFactory.setUsername(rabbitMqConfig.getPublisherUsername());
connectionFactory.setPassword(rabbitMqConfig.getPublisherPassword());
return connectionFactory;
}
/**
* Bean RabbitTemplate
* @return RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new
ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
rabbitTemplate.setRetryTemplate(retryTemplate);
/* rabbitTemplate.setExchange(rabbitMqConfig.getPublisherTopic());
rabbitTemplate.setRoutingKey(rabbitMqConfig.getRoutingKey());*/
rabbitTemplate.setConfirmCallback((correlation, ack, reason) ->
if(correlation != null ) {
LOGGER.info("Received " + (ack ? " ack " : " nack ") +
"for correlation: " + correlation);
if(ack) {
// this is confirmation received..
// here is code to ack Q1. correlation.getId and ack
eppQ2Publisher.ackMessage(new
Long(correlation.getId().toString()));
} else {
// no confirmation received and no need to do any
thing for retry..
}
}
});
rabbitTemplate.setReturnCallback((message, replyCode,
replyText, exchange, routingKey) ->
{
LOGGER.error("Returned: " + message + "\nreplyCode: " +
replyCode+ "\nreplyText: " + replyText +
"\nexchange/rk: " + exchange + "/" + routingKey);
});
return rabbitTemplate;
}
/**
* Bean Jackson2JsonMessageConverter
* @return Jackson2JsonMessageConverter
*/
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
public interface EPPQ2Publisher {
public void sendMessage(Message msg,Long deliveryTag);
public void sendMessages(List<Message> msgList, Channel channel, List<Long> deliveryTagList);
public void ackMessage(Long deliveryTag);
}
@Component
public class EPPQ2PublisherImpl implements EPPQ2Publisher{
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired
private RabbitTemplate rabbitTemplate;
private Channel channel;
/**
* Method sendMessage for sending individual scrubbed and encrypted message to publisher queue (Q2).
* @param msg - message domain object
* @param deliveryTag - is message delivery tag.
*/
@Override
public void sendMessage(Message msg,Long deliveryTag) {
rabbitTemplate.convertAndSend(rabbitMqConfig.getPublisherTopic(), rabbitMqConfig.getRoutingKey(), msg,new CorrelationData(deliveryTag.toString()));
}
/**
* sendMessages for sending list of scrubbed and encrypted messages to publisher queue (Q2)
* @param msgList - is list of scrubbed and encrypted messages
* @param channel - is ampq client channel
* @param deliveryTagList - is list of incoming message delivery tags.
*/
@Override
public void sendMessages(List<Message> msgList, Channel channel, List<Long>deliveryTagList) {
if(this.channel == null) {
this.channel = channel;
}
for (int i = 0 ; i < msgList.size(); i ++) {
sendMessage(msgList.get(i),deliveryTagList.get(i));
}
}
/**
* Method ackMessage for sending acknowledgement to subscriber Q1
* @param deliveryTag - is deliveryTag for each individual message.
*/
@Override
public void ackMessage(Long deliveryTag) {
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
org.springframework.amqp.rabbit.connection.CachingConnectionFactory Создание кэшированного канала кролика из AMQChannel (amqp: //dftp_subscriber@10.15.190.18: 5672 / hydra.services, 2)
Я ожидал быть dftp_publisher, и мне кажется, что моя конфигурация темы не введена должным образом.
Журнал ошибок:
org.springframework.amqp.rabbit.core.RabbitTemplate [0; 39m: выполнение обратного вызова RabbitTemplate $$ Lambda $ 285/33478758 на RabbitMQ Канал: Кэшируемый канал кролика: AMQChannel (amqp: //dftp_subscri8: 56.1: 56.1.15) hydra.services, 2), conn: Proxy @ 1dc339f Совместное подключение к кролику: SimpleConnection @ 2bd7c8 [делегат = amqp: //dftp_subscriber@10.15.190.18: 5672 / hydra.services, localPort = 55553]
org.springframework.amqp.rabbit.core.RabbitTemplate [0; 39m: Публикация сообщения (Body: '{"HEADER": {"RETRY_COUNT": 0, "PUBLISH_EVENT_TYPE": "AUTH"}, "PAYLOAD": {"MTI ":" 100" , "MTI_REQUEST": "100", "PAN": "6011000000000000", "PROCCODE": "00", "PROCCODE_REQUEST": "00", "FROM_ACCOUNT": "00", "TO_ACCOUNT": "00", "TRANSACTION_AMOUNT": "000000000100", "TRANSMISSION_MMDDHHMMSS": "0518202930", "СТАН": "000001", "LOCALTIME_HHMMSS": "010054", "LOCALDATE_YYMMDD": "180522", "EXPIRATION_DATE_YYMM": "2302 », "MERCHANT_TYPE": "5311", "ACQUIRING_COUNTRY_CODE": "840", "POS_ENTRY_MODE": "02", "POS_PIN_ENTRY_CAPABILITIES": "0", "FUNCTION_CODE": "100", "ACQUIRING_ID_CODE": "000000", "FORWARDING_ID_CODE": "000000", "RETRIEVAL_REFERENCE_NUMBER": "1410N644D597", "MERCHANT_NUMBER": "601100000000596", "CARD_ACCEPTOR_NAME": "Discover Acq Simulator", "CARD_ACCEPTOR_CITY": "Ривервудз", "CARD_ACCEPTOR_STATE": "IL", "CARD_ACCEPTOR_COUNTRY": "840", "CARD_ACCEPTOR_COUNTRY_3NUMERIC": "840", "NRID": "123456789012345", "TRANSACTION_CURRENCY_CODE": "840", "POS_TERMINAL_ATTENDANCE_INDICA TOR ":" 0" , "POS_PARTIAL_APPROVAL_INDICATOR": "0", "POS_TERMINAL_LOCATION_INDICATOR": "0", "POS_TRANSACTION_STATUS_INDICATOR": "0", "POS_ECOMMERCE_TRAN_INDICATOR": "0", "POS_TYPE_OF_TERMINAL_DEVICE": "0", "POS_CARD_PRESENCE_INDICATOR" : "0", "POS_CARD_CAPTURE_CAPABILITIES_INDICATOR": "0", "POS_TRANSACTION_SECURITY_INDICATOR": "0", "POS_CARD_DATA_TERMINAL_INPUT_CAPABILITY_INDICATOR": "С", "POS_CARDHOLDER_PRESENCE_INDICATOR": "0", "DFS_POS_DATA": "0000000000C00", "GEODATA_STREET_ADDRESS":» 2500 LAKE COOK ROAD "," GEODATA_POSTAL_CODE ":" 600150000 "," GEODATA_COUNTY_CODE ":" 840 "," GEODATA_STORE_NUMBER ":" 10001 "," GEODATA_MALL_NAME ":" ОТКЛЮЧИТЬ ФИНАНСОВЫЙ SRID " ":" 123459875 "," VERSION_INDICATOR ":" 03141 "}} 'MessageProperties [headers = { TypeId = com.discover.dftp.scrubber.domain.Message}, contentType = application / json, contentEncoding = UTF-8, contentLength = 1642, deliveryMode = PERSISTENT, priority = 0, deliveryTag = 0]) на бирже [hydra.hash2Syphon.exc], routingKey = [100]
org.springframework.amqp.rabbit.connection.CachingConnectionFactory $ DefaultChannelCloseLogger [0; 39m: отключение канала: ошибка канала; Метод протокола: #method (reply-code = 403, reply-text = ACCESS_REFUSED - невозможно опубликовать на внутреннем обмене 'hydra.hash2Syphon.exc' в vhost 'hydra.services', class-id = 60, method-id = 40)
РЕДАКТИРОВАТЬ 2.
@Component
@Configuration
public class ListenerContainerFactory {
static final Logger logger = LoggerFactory.getLogger(ListenerContainerFactory.class);
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired
EPPQ2Subscriber receiver;
@Autowired
EPPQ2ChanelAwareSubscriber receiverChanel;
public ListenerContainerFactory(ConfigurableApplicationContext ctx) {
printContainerStartMsg();
}
private void printContainerStartMsg() {
logger.info("----------- Scrubber Container Starts --------------");
}
@Bean
public SimpleRabbitListenerContainerFactory queueListenerContainer(AbstractConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
connectionFactory.setAddresses(rabbitMqConfig.getSubscriberHosts());
connectionFactory.setVirtualHost("hydra.services");
connectionFactory.setPort(rabbitMqConfig.getSubscriberPort());
connectionFactory.setUsername(rabbitMqConfig.getSubscriberUsername());
connectionFactory.setPassword(rabbitMqConfig.getSubscriberPassword());
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
MessageListenerAdapter listenerAdapter(EPPQ2Subscriber receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
/*@Bean
MessageListenerAdapter listenerAdapterWithChanel(EPPQ2ChanelAwareSubscriber receiverChanel) {
return new MessageListenerAdapter(receiverChanel);
}*/
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(fatalExceptionStrategy());
}
@Bean
public ScrubberFatalExceptionStrategy fatalExceptionStrategy() {
return new ScrubberFatalExceptionStrategy();
}
}
и последняя конфигурация темы.
@Component
@Configuration
public class TopicConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicConfiguration.class);
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired EPPQ2Publisher eppQ2Publisher;
/**
* Bean Queue
* @return Queue
*/
@Bean
Queue queue() {
return new Queue(rabbitMqConfig.getPublisherQueueName(), false);
}
/**
* Bean TopicExchage
* @return TopicExchage
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(rabbitMqConfig.getPublisherTopic());
}
/**
* Bean BindingBuilder
* @param queue - Queue
* @param exchange - TopicExchange
* @return BindingBuilder
*/
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(rabbitMqConfig.getRoutingKey());
}
/**
* Caching connection factory
* @return CachingConnectionFactory
*/
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqConfig.getPublisherHosts(),
rabbitMqConfig.getPublisherPort());
connectionFactory.setUsername(rabbitMqConfig.getPublisherUsername());
connectionFactory.setPassword(rabbitMqConfig.getPublisherPassword());
return connectionFactory;
}
/**
* Bean RabbitTemplate
* @return RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
rabbitTemplate.setRetryTemplate(retryTemplate);
/* rabbitTemplate.setExchange(rabbitMqConfig.getPublisherTopic());
rabbitTemplate.setRoutingKey(rabbitMqConfig.getRoutingKey());*/
rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
if(correlation != null ) {
LOGGER.info("Received " + (ack ? " ack " : " nack ") + "for correlation: " + correlation);
if(ack) {
// this is confirmation received..
// here is code to ack Q1. correlation.getId() and ack
eppQ2Publisher.ackMessage(new
Long(correlation.getId().toString()));
} else {
// no confirmation received and no need to do any
}
}
});
rabbitTemplate.setReturnCallback(
(message, replyCode, replyText,
exchange, routingKey) ->
{
LOGGER.error("Returned: " + message + "\nreplyCode: " +
replyCode
+ "\nreplyText: " + replyText + "\nexchange/rk: " +
exchange + "/" + routingKey);
});
return rabbitTemplate;
}
/**
* Bean Jackson2JsonMessageConverter
* @return Jackson2JsonMessageConverter
*/
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
}