Возможен наилучший подход: параллельный или одиночный потребитель из Oracle JMS Queue - PullRequest
0 голосов
/ 17 сентября 2018

Требование: получить данные из Oracle AQJMS, обработать их и передать их в облачный поток.

Означает ли добавление MessageListener означает, что каждое сообщение будет обрабатываться в отдельном потоке. Мне нужны эти прослушиватели для непрерывного прослушивания очереди и храненияна обработку данных.

Решение: Ниже приводится суть кода, который выполняет эту обработку.Вопрос: означает ли Регистрация Messagelistener, что каждая запись обрабатывается в отдельном потоке, когда запись поступает в очередь.

Требуется ли пул соединений для очереди по отношению к реализации QueueSession:

Требование: Мне нужно реализовать несколькопотребитель очереди, который будет привязан к нескольким очередям JMS, обрабатывает сообщения и отправляет их в облако.

Я написал отдельный исполняемый файл для каждой очереди соединений, с которой нам нужно обрабатывать сообщения.

Мой вопрос заключается в том, как лучше всего это сделать.

Вот код, касающийся вопросов, который выполняет обработку.

public class MultiQueueConsumer implements Runnable {

    /*
     * private ISubscribeQueue subscribeQueue=SubScribeQueueImpl.INSTANCE; private
     * CloudSync cloudSync=AWSCloudSyncImpl.INSTANCE; private DataExtractor
     * dataExtractor=DataExtractorImpl.INSTANCE;
     */

    private ISubscribeQueue subscribeQueue;
    private DataExtractor dataExtractor;
    private DataTransformer dataTransformer;
    private CloudSync cloudSync;
    private String rmobInstance;

    @Inject
    public MultiQueueConsumer(ISubscribeQueue iSubscribeQueue, CloudSync cloudSync, DataExtractor dataExtractor,
            DataTransformer dataTransformer, @Assisted String rmobInstance) {
        this.subscribeQueue = iSubscribeQueue;
        this.cloudSync = cloudSync;
        this.dataExtractor = dataExtractor;
        this.rmobInstance = rmobInstance;
    }

    @Override
    public void run() {
        Optional<QueueMetaHolder> metaHolder = subscribeQueue.subscribe(rmobInstance);
        if (metaHolder.isPresent())
            processRecords(metaHolder.get());
    }

    private void processRecords(QueueMetaHolder metaHolder) {
        QueueReceiver queueReceiver = metaHolder.getQueueReceiver();
        try {
            System.out.println("Listening to queue" + " " + metaHolder.getConnection().getClientID());
        } catch (JMSException e1) {
            e1.printStackTrace();
        }

        try {

            queueReceiver.setMessageListener(new MessageListener() {

                @Override
                public void onMessage(Message message) {
                    if (message != null) {
                        if (message instanceof TextMessage) {
                            TextMessage textMessage = (TextMessage) message;
                            long changeId;
                            try {
                                changeId = Long.parseLong(textMessage.getText());

                                message.acknowledge();

                                McpClusteredDeltaCollection clusteredDeltaCollection = dataExtractor
                                        .getClusteredDataCollection(changeId, rmobInstance);
                                Optional<PayLoad> payload = dataTransformer
                                        .tranformResultSetToPayLoad(clusteredDeltaCollection);
                                boolean cloudStreamSyncStatus = cloudSync.syncToStream(payload.get());
                                boolean cloudSearchServerSyncStatus = cloudSync.syncToSearchServer(payload.get());


                            } catch (Exception e) {
                                e.printStackTrace();
                            }

                        }

                    }

                    else
                        System.out.println("Nothing recevied from JMS queue" + " " + message);
                }
            });
            // Message message =
            // queueReceiver.receive(TimeUnit.MILLISECONDS.toSeconds(2000));

        } catch (JMSException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...