У меня есть таблица базы данных, которая действует как очередь задач. Вставки в эту таблицу происходят в других местах (например, триггеры таблиц и т. Д. c.).
У меня есть многопоточная служба Windows, которая выполняет следующие действия:
- Используя хранимую процедуру, считывает таблицу очереди для строк, помеченных как
'todo'
, и помечает прочитанные строки как 'doing'
. - Обрабатывает эти строки.
- Помечает строки как
'done'
.
Идеальная ситуация: Когда служба запущена, несколько процессоров / потоков будут обрабатывать уникальный пакет из myqueue
строк.
Фактический: Бывают случаи, когда несколько процессоров / потоков извлекают одни и те же myqueue
строки. Это означает, что строка myqueue
может обрабатываться несколько раз.
Вопрос: Есть ли проблема с параллелизмом, которую я здесь упустил?
T- SQL Код:
CREATE TABLE myqueue (
id varchar(20) NOT NULL,
task_data varchar(255) NULL,
status varchar(10) NULL,
PRIMARY KEY CLUSTERED ( id ASC )
CREATE PROCEDURE dbo.getNextInQueue @itemsToGet INT = 1000
AS
BEGIN
IF OBJECT_ID('tempdb..#tmpnext') IS NULL
BEGIN
CREATE TABLE #tmpnext (
id VARCHAR(20)
,PRIMARY KEY CLUSTERED (id)
)
END
INSERT INTO #tmpnext (id)
SELECT TOP (@itemsToGet) id
FROM myqueue WITH (
UPDLOCK
,READPAST
)
WHERE status = 'todo'
UPDATE m
SET m.status = 'doing'
FROM (
SELECT id
FROM #tmpnext
) AS tmp
INNER JOIN myqueue m ON m.id = tmp.id
SELECT m.*
FROM myqueue m WITH (NOLOCK)
JOIN #tmpnext t ON t.id = m.id
END
C# Код:
public class QueueProcessorService : System.ServiceProcess.ServiceBase
{
private System.Threading.Timer IntervalTimer;
private ServiceParams serviceParams; // A POCO that reads arguments, returns defaults if not provided.
private bool isStopped = false;
private readonly object _mutex = new object();
private int concurrentRunningProcessors = 0;
private string ConnectionString => ConfigurationManager.AppSettings["ConnectionString"];
protected override void OnStart(string[] args)
{
serviceParams = new ServiceParams(args);
IntervalTimer = new System.Threading.Timer(
callback: new System.Threading.TimerCallback(IntervalTimer_ElapsedAsync),
state: null,
dueTime: serviceParams.PeriodInMillisec,
period: serviceParams.PeriodInMillisec);
}
protected override void OnStop()
{
isStopped = true;
IntervalTimer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
while (concurrentRunningProcessors > 0)
{
// Wait until running processors finish
};
IntervalTimer.Dispose();
IntervalTimer = null;
}
private async void IntervalTimer_ElapsedAsync(object state)
{
try
{
if (!isStopped && concurrentRunningProcessors <= serviceParams.MaxConcurrentProcessors)
{
lock (_mutex)
{
concurrentRunningProcessors++;
}
try
{
await ProcessTasksInQueue();
}
catch (Exception)
{
throw;
}
finally
{
if (concurrentRunningProcessors > 0)
{
lock (_mutex)
{
concurrentRunningProcessors--;
}
}
}
}
}
catch (Exception ex)
{
DealWithException(ex); // e.g. Logs, etc.
this.Stop();
}
}
private async Task ProcessTasksInQueue()
{
using (var conn = new SqlConnection(ConnectionString))
{
var cmd = conn.CreateCommand();
cmd.CommandText = "dbo.getNextInQueue";
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.Add(new SqlParameter("@itemsToGet", SqlDbType.Int) { Value = serviceParams.ItemsToGet });
var rowsRetrieved = new List<RowInTaskQueue>();
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
var row = new RowInTaskQueue()
{
Id = reader.GetString(reader.GetOrdinal("id")),
Data = reader.GetString(reader.GetOrdinal("task_data")),
Status = reader.GetString(reader.GetOrdinal("status")),
};
rowsRetrieved.Add(row);
}
}
ProcessTheRows(rowsRetrieved);
}
}
}