Как установить x-opt-offset при установлении соединения с концентратором событий, используя AMQP, чтобы избежать воспроизведения сообщения - PullRequest
0 голосов
/ 18 мая 2019

Мое приложение подключается к концентратору событий Azure для получения и обработки сообщений. Я вижу, что каждый раз, когда я перезапускаю свое приложение, все сообщения в течение срока хранения воспроизводятся. Я прочитал о смещении, чтобы избежать этой проблемы, и у меня есть метод, который устанавливает соединение с концентратором событий Azure следующим образом:

    MessageConsumer connect() {
        // set up JNDI context
        BatchEventHubConfig batchEventHubConfig = //MAP CONTAINING CONFIG
        String queueName = "EventHub"
        String connectionFactoryName = "SBCF"
        //Long offset = batchAccountManager.batchStorageManager.batchJobMsgCheckpointService.get(batchEventHubConfig.namespace, batchEventHubConfig.getMessageQueueAddress(partitionInx, true))?.offset
        Hashtable<String, String> hashtable = new Hashtable<>()
        hashtable.put("connectionfactory.${connectionFactoryName}", batchEventHubConfig.getAMQPConnectionURI())
        hashtable.put("queue.${queueName}", batchEventHubConfig.getMessageQueueAddress(partitionInx))
        //hashtable.put("apache.org:selector-filter:string", "amqp.annotation.x-opt-offset > '${offset}'")
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory")
        Context context = new InitialContext(hashtable)

        ConnectionFactory factory = (ConnectionFactory) context.lookup(connectionFactoryName)
        queue = (Destination) context.lookup(queueName)
        connection = factory.createConnection(batchEventHubConfig.sasPolicyName, batchEventHubConfig.sasPolicyKey)
        connection.setExceptionListener(new BatchExceptionListener(eventHubConnection: this))

        connection.start()
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
        messageConsumer = session.createConsumer(queue)
        messageConsumer.setMessageListener(messageListener)
        messageConsumer
    }

Закомментированный код для смещения был тем, что я пытался прочитать здесь: https://azure.github.io/amqpnetlite/articles/azure_eventhubs.html

Как правильно установить смещение, чтобы сообщения не воспроизводились при перезапуске приложения?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...