Требование: получить данные из Oracle AQJMS, обработать их и передать их в облачный поток.
Означает ли добавление MessageListener означает, что каждое сообщение будет обрабатываться в отдельном потоке. Мне нужны эти прослушиватели для непрерывного прослушивания очереди и храненияна обработку данных.
Решение: Ниже приводится суть кода, который выполняет эту обработку.Вопрос: означает ли Регистрация Messagelistener, что каждая запись обрабатывается в отдельном потоке, когда запись поступает в очередь.
Требуется ли пул соединений для очереди по отношению к реализации QueueSession:
Требование: Мне нужно реализовать несколькопотребитель очереди, который будет привязан к нескольким очередям JMS, обрабатывает сообщения и отправляет их в облако.
Я написал отдельный исполняемый файл для каждой очереди соединений, с которой нам нужно обрабатывать сообщения.
Мой вопрос заключается в том, как лучше всего это сделать.
Вот код, касающийся вопросов, который выполняет обработку.
public class MultiQueueConsumer implements Runnable {
/*
* private ISubscribeQueue subscribeQueue=SubScribeQueueImpl.INSTANCE; private
* CloudSync cloudSync=AWSCloudSyncImpl.INSTANCE; private DataExtractor
* dataExtractor=DataExtractorImpl.INSTANCE;
*/
private ISubscribeQueue subscribeQueue;
private DataExtractor dataExtractor;
private DataTransformer dataTransformer;
private CloudSync cloudSync;
private String rmobInstance;
@Inject
public MultiQueueConsumer(ISubscribeQueue iSubscribeQueue, CloudSync cloudSync, DataExtractor dataExtractor,
DataTransformer dataTransformer, @Assisted String rmobInstance) {
this.subscribeQueue = iSubscribeQueue;
this.cloudSync = cloudSync;
this.dataExtractor = dataExtractor;
this.rmobInstance = rmobInstance;
}
@Override
public void run() {
Optional<QueueMetaHolder> metaHolder = subscribeQueue.subscribe(rmobInstance);
if (metaHolder.isPresent())
processRecords(metaHolder.get());
}
private void processRecords(QueueMetaHolder metaHolder) {
QueueReceiver queueReceiver = metaHolder.getQueueReceiver();
try {
System.out.println("Listening to queue" + " " + metaHolder.getConnection().getClientID());
} catch (JMSException e1) {
e1.printStackTrace();
}
try {
queueReceiver.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
long changeId;
try {
changeId = Long.parseLong(textMessage.getText());
message.acknowledge();
McpClusteredDeltaCollection clusteredDeltaCollection = dataExtractor
.getClusteredDataCollection(changeId, rmobInstance);
Optional<PayLoad> payload = dataTransformer
.tranformResultSetToPayLoad(clusteredDeltaCollection);
boolean cloudStreamSyncStatus = cloudSync.syncToStream(payload.get());
boolean cloudSearchServerSyncStatus = cloudSync.syncToSearchServer(payload.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
else
System.out.println("Nothing recevied from JMS queue" + " " + message);
}
});
// Message message =
// queueReceiver.receive(TimeUnit.MILLISECONDS.toSeconds(2000));
} catch (JMSException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}