Как удалить сообщения из очереди, когда приложение находилось в автономном режиме (Oracle Advanced Queue) - PullRequest
0 голосов
/ 03 февраля 2012

У меня 2 вопроса. Ниже приведен сценарий -

Существует 2 разных процесса Процесс А и Процесс Б. Процесс A помещает сообщение в очередь сообщений. Процесс B выводит сообщение из очереди сообщений.

1) Процесс B на некоторое время завершает работу, но Процесс A продолжает помещать сообщение в очередь. Когда Процесс B возвращается в рабочее состояние, как удалить из очереди сообщения в очереди, которые были отправлены Процессом A, когда Процесс B был отключен?

2) Очередь, которую я использую, - это очередь с несколькими потребителями, поскольку для удаления сообщения из очереди должно быть более 1 процесса. Причина дизайна заключается в том, что если один из процессов B умирает, другой процесс B может продолжать обрабатывать сообщение. В то же время, если 1 экземпляр Процесса B получил сообщение, он должен уведомить другой Процесс B, чтобы он не обрабатывал сообщение.

Я не могу найти образцы. Любая помощь очень ценится.

1 Ответ

0 голосов
/ 31 января 2013

Я только что завершил проект с довольно похожими требованиями.

Задача 1) Я создал таймер службы Windows, который периодически запускает службу WCF Restful. Служба WCF затем удаляла бы все в очереди (до 500 сообщений для каждого вызова). Все, что помещено в очередь, должно автоматически обрабатываться по порядку, поэтому, даже если этот таймер остановился после перезапуска, он взял бы с того места, где остановился.

Проблема 2) Я реплицировал данные из Oracle в CouchBase, поэтому у меня была временная метка для извлечения, когда процесс начался, и временная метка для уже сохраненных данных в CouchBase, если первая была старше последней, тогда она не будет сохранена. (Это должно было заботиться об условиях гонки).

В Oracle у меня также был триггер, который, когда что-то ставилось в очередь, копировал идентификатор и время в очереди во вторую таблицу. Периодически проверяется эта вторая таблица, и если элемент был исключен из очереди в таблице очередей, но вторая таблица не была обновлена, чтобы отразить это в течение определенного периода времени службой WCF, он будет повторно ставить в очередь данные, поскольку в процессе произошла ошибка .

В случае, если это полезно, вот пример службы restful wcf, использующей odp.net.

OracleAQQueue _queueObj;
OracleConnection _connObj;
_connString = ConfigurationManager.ConnectionStrings["connectionstring"].ToString();
_connObj = new OracleConnection(_connString);
_queueObj = new OracleAQQueue("QUEUENAME", _connObj);
_connObj.Open();

  int i = 0;
  bool messageAvailable = true;

  while (messageAvailable && i < 500)
  {
    OracleTransaction _txn = _connObj.BeginTransaction();
    //Makes dequeue part of transaction
    _queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
    _queueObj.DequeueOptions.ConsumerName = "CONSUMERNAME"
    try
    {
         //Wait  number of seconds for dequeue, default is forever
         _queueObj.DequeueOptions.Wait = 2;
         _queueObj.MessageType = OracleAQMessageType.Raw;
         _queueObj.DequeueOptions.ProviderSpecificType = true;
         OracleAQMessage _depMsq = _queueObj.Dequeue();
         var _binary = (OracleBinary)_depMsq.Payload;
         byte[] byteArray = _binary.Value;
         _txn.Commit();
     }
     catch (Exception ex)
     {
         //This catch will always fire when all messages have been dequeued
         messageAvailable = false;
         if (ex.Message.IndexOf("end-of-fetch during message dequeue") == -1)
            {
             //Actual error present. 
             log.Info("Problem occurred during dequeue process : " + ex.Message);
            }
     }
  }

    _queueObj.Dispose();
    _connObj.Close();
    _connObj.Dispose();
    _connObj = null;
...