Выборочная очередь несвязанных сообщений в Oracle Advanced Queuing - PullRequest
2 голосов
/ 20 октября 2010

Этот вопрос относится к снятию очереди сообщений в Oracle Streams Advanced Queuing .

Мне нужно убедиться, что сообщения, связанные друг с другом, обрабатываются последовательно.

Например, предположим, что очередь заполнена четырьмя сообщениями, имеющими связанное с бизнесом поле, называемое ссылкой на транзакцию (txn_ref), и два из сообщений (1,3) принадлежат одной и той же транзакции (000001):

id | txn_ref | 
---+---------+
 1 | 000001  |
 2 | 000002  |
 3 | 000001  |
 4 | 000003  |

Предположим также, что я запускаю 4 потока / процесса, которые хотят удалить из этой очереди очередь.Должно произойти следующее:

  1. поток 1 удаляет сообщение # 1
  2. поток 2 удаляет сообщение # 2
  3. поток 3 удаляет сообщение # 4 (поскольку сообщение # 3относится к # 1, а # 1 еще не завершено).
  4. поток 4 блокирует ожидание сообщения
  5. поток 1 фиксирует свою работу для сообщения # 1
  6. поток 4 (или, возможно, поток 1) удаляет сообщение № 3.

Первоначально я думал, что я могу достичь этого с условием удаления, когда ENQ_TIME (время постановки в очередь) не позднее, чем любой другой ENQ_TIME всех сообщенийкоторые имеют тот же TXN_REF.Но моя проблема в том, как сослаться на TXN_REF сообщения, которое я еще не выбрал, чтобы выбрать его.например,

// Java API
String condition = "ENQ_TIME = (select min(ENQ_TIME) from AQ_TABLE1 where ??";
dequeueOption.setCondition(condition);

Можно ли здесь добиться того, чего я хочу?

Ответы [ 2 ]

2 голосов
/ 26 октября 2010

Чтобы ответить на ваш прямой вопрос, это можно сделать с помощью поля correlation (в таблице называемого CORRID), которое предназначено для этой цели.

Таким образом, в очереди вы должны использовать метод AQMessageProperties.setCorrelation() со значением TXN_REF в качестве параметра.Тогда в вашем состоянии вы бы сделали что-то вроде этого:

// Java API
String condition = "tab.ENQ_TIME = (select min(AQ_TABLE1.ENQ_TIME) from AQ_TABLE1 self where tab.CORRID=AQ_TABLE1.CORRID)";
dequeueOption.setCondition(condition);
1 голос
/ 26 октября 2010

Стратегия, которую вы можете попробовать, если это возможно, использует группы сообщений.Документация Oracle 1002 * описывает ее кратко, но я нашел эту статью о мире жаб гораздо более полезнойПо сути, вы настраиваете таблицу очередей для обработки всех сообщений, отправленных одновременно, как одной «группы».При снятии очереди только один пользователь может одновременно исключать из «группы» сообщений.

...