Я только что завершил проект с довольно похожими требованиями.
Задача 1)
Я создал таймер службы Windows, который периодически запускает службу WCF Restful. Служба WCF затем удаляла бы все в очереди (до 500 сообщений для каждого вызова). Все, что помещено в очередь, должно автоматически обрабатываться по порядку, поэтому, даже если этот таймер остановился после перезапуска, он взял бы с того места, где остановился.
Проблема 2)
Я реплицировал данные из Oracle в CouchBase, поэтому у меня была временная метка для извлечения, когда процесс начался, и временная метка для уже сохраненных данных в CouchBase, если первая была старше последней, тогда она не будет сохранена. (Это должно было заботиться об условиях гонки).
В Oracle у меня также был триггер, который, когда что-то ставилось в очередь, копировал идентификатор и время в очереди во вторую таблицу. Периодически проверяется эта вторая таблица, и если элемент был исключен из очереди в таблице очередей, но вторая таблица не была обновлена, чтобы отразить это в течение определенного периода времени службой WCF, он будет повторно ставить в очередь данные, поскольку в процессе произошла ошибка .
В случае, если это полезно, вот пример службы restful wcf, использующей odp.net.
OracleAQQueue _queueObj;
OracleConnection _connObj;
_connString = ConfigurationManager.ConnectionStrings["connectionstring"].ToString();
_connObj = new OracleConnection(_connString);
_queueObj = new OracleAQQueue("QUEUENAME", _connObj);
_connObj.Open();
int i = 0;
bool messageAvailable = true;
while (messageAvailable && i < 500)
{
OracleTransaction _txn = _connObj.BeginTransaction();
//Makes dequeue part of transaction
_queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
_queueObj.DequeueOptions.ConsumerName = "CONSUMERNAME"
try
{
//Wait number of seconds for dequeue, default is forever
_queueObj.DequeueOptions.Wait = 2;
_queueObj.MessageType = OracleAQMessageType.Raw;
_queueObj.DequeueOptions.ProviderSpecificType = true;
OracleAQMessage _depMsq = _queueObj.Dequeue();
var _binary = (OracleBinary)_depMsq.Payload;
byte[] byteArray = _binary.Value;
_txn.Commit();
}
catch (Exception ex)
{
//This catch will always fire when all messages have been dequeued
messageAvailable = false;
if (ex.Message.IndexOf("end-of-fetch during message dequeue") == -1)
{
//Actual error present.
log.Info("Problem occurred during dequeue process : " + ex.Message);
}
}
}
_queueObj.Dispose();
_connObj.Close();
_connObj.Dispose();
_connObj = null;