У меня есть класс конфигурации JMS, где я настраиваю фабрику соединений для JMSListener
@Configuration
@EnableJms
@Slf4j
public class SqsQueueListenerConfigForBooking {
@Autowired
private SqsListenerServiceForBooking sqsListenerForBooking;
@Autowired
private BookingConfig bookingConfig;
@Autowired
private Sender kinesisSender;
@Autowired
private AwsCloudCredentialsProviderFactory awsCloudCredentialsProviderFactory;
@Lookup
protected FinishedSpanHandler getFinishedSpanHandler(String transactionId) {
return null;
}
@Bean
public DefaultMessageListenerContainer jmsListenerContainer(String transactionId) {
log.info("transactionID---"+transactionId);
AmazonSQS sqs = AmazonSQSClientBuilder.standard().withRegion(awsRegion)
.withCredentials(awsCloudCredentialsProviderFactory.getAWSCredentialsProvider())
.build();
SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqs);
ConnectionFactory tracingConnectionFactory = getConnectionFactoryWrappedWithTracing(sqsConnectionFactory,sqsListenerForBooking.getTransactionId());
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(tracingConnectionFactory);
dmlc.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
dmlc.setErrorHandler(error -> {
log.error("Error in listener!", error);
});
dmlc.setDestinationName(bookingRequestQueueName);
dmlc.setMessageListener(sqsListenerForBooking);
return dmlc;
}
private ConnectionFactory getConnectionFactoryWrappedWithTracing(SQSConnectionFactory sqsConnectionFactory,String transactionId) {
log.info("transactionId"+transactionId);
Reporter spanReporter = AsyncReporter.create(kinesisSender);
FinishedSpanHandler finishedHandler = getFinishedSpanHandler(transactionId);//beanFactory.getBean(FinishedSpanHandler.class,"");// Look for alternative
Tracing tracing = Tracing.newBuilder().addFinishedSpanHandler(finishedHandler).spanReporter(spanReporter).build();
MessagingTracing messagingTracing = MessagingTracing.newBuilder(tracing).build();
JmsTracing jmsTracing = JmsTracing.newBuilder(messagingTracing)
.remoteServiceName(bookingRequestQueueName)
.build();
ConnectionFactory tracingConnectionFactory = jmsTracing.connectionFactory(sqsConnectionFactory);
return tracingConnectionFactory;
}
}
Мой класс реализации MessageListener
@Component
@Scope("prototype")
@Slf4j public class SqsListenerServiceForBooking implements MessageListener {
@Override
public void onMessage(Message sqsQueueEventMessage) {
log.info("Listening.....");
try {
TextMessage sqsEvent = (TextMessage) sqsQueueEventMessage;<-- it has transactionId
JsonNode sqsEventNode = mapper.readTree(sqsEvent.getText().toString());
if (isRegistrationRequestEvent(sqsEventNode)) {
handleRegistrationRequest(sqsEventNode);
} else {
handleEventsFromBookingService(sqsEventNode);
}
} catch (JMSException e) {
log.info(e.toString());
} catch (JsonProcessingException e) {
log.info(e.toString());
}
}
Постановка проблемы sqsQueueEventMessage, который Я получаю в реализации слушателя дает транзакции ID, и я должен передать этот идентификатор в класс SqsQueueListenerConfigForBooking, который будет использоваться для другой службы.
Мне нужна помощь в реализации приведенного выше утверждения Спасибо