Windows Service / SQL Проблема параллелизма серверов - PullRequest
0 голосов
/ 04 августа 2020

У меня есть таблица базы данных, которая действует как очередь задач. Вставки в эту таблицу происходят в других местах (например, триггеры таблиц и т. Д. c.).

У меня есть многопоточная служба Windows, которая выполняет следующие действия:

  1. Используя хранимую процедуру, считывает таблицу очереди для строк, помеченных как 'todo', и помечает прочитанные строки как 'doing'.
  2. Обрабатывает эти строки.
  3. Помечает строки как 'done'.

Идеальная ситуация: Когда служба запущена, несколько процессоров / потоков будут обрабатывать уникальный пакет из myqueue строк.

Фактический: Бывают случаи, когда несколько процессоров / потоков извлекают одни и те же myqueue строки. Это означает, что строка myqueue может обрабатываться несколько раз.

Вопрос: Есть ли проблема с параллелизмом, которую я здесь упустил?

T- SQL Код:

CREATE TABLE myqueue (
    id varchar(20) NOT NULL,
    task_data varchar(255) NULL,
    status varchar(10) NULL,
PRIMARY KEY CLUSTERED ( id ASC )

CREATE PROCEDURE dbo.getNextInQueue @itemsToGet INT = 1000
AS
BEGIN
    IF OBJECT_ID('tempdb..#tmpnext') IS NULL
    BEGIN
        CREATE TABLE #tmpnext (
            id VARCHAR(20)
            ,PRIMARY KEY CLUSTERED (id)
            )
    END

    INSERT INTO #tmpnext (id)
    SELECT TOP (@itemsToGet) id
    FROM myqueue WITH (
            UPDLOCK
            ,READPAST
            )
    WHERE status = 'todo'

    UPDATE m
    SET m.status = 'doing'
    FROM (
        SELECT id
        FROM #tmpnext
        ) AS tmp
    INNER JOIN myqueue m ON m.id = tmp.id

    SELECT m.*
    FROM myqueue m WITH (NOLOCK)
    JOIN #tmpnext t ON t.id = m.id
END

C# Код:

public class QueueProcessorService : System.ServiceProcess.ServiceBase
{
    private System.Threading.Timer IntervalTimer;
    private ServiceParams serviceParams; // A POCO that reads arguments, returns defaults if not provided.
    private bool isStopped = false;
    
    private readonly object _mutex = new object();
    private int concurrentRunningProcessors = 0;
    
    private string ConnectionString => ConfigurationManager.AppSettings["ConnectionString"];
    
    protected override void OnStart(string[] args)
    {
        serviceParams = new ServiceParams(args); 
        
        IntervalTimer = new System.Threading.Timer(
                    callback: new System.Threading.TimerCallback(IntervalTimer_ElapsedAsync),
                    state: null,
                    dueTime: serviceParams.PeriodInMillisec,
                    period: serviceParams.PeriodInMillisec);
    }
    
    protected override void OnStop()
    {
        isStopped = true;
        IntervalTimer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
        
        while (concurrentRunningProcessors > 0)
        {
            // Wait until running processors finish
        };

        IntervalTimer.Dispose();
        IntervalTimer = null;
    }
    
    private async void IntervalTimer_ElapsedAsync(object state)
    {
        try
        {
            if (!isStopped && concurrentRunningProcessors <= serviceParams.MaxConcurrentProcessors)
            {
                lock (_mutex)
                {
                    concurrentRunningProcessors++;
                }
                try
                {
                    await ProcessTasksInQueue();
                }
                catch (Exception)
                {
                    throw;
                }
                finally
                {
                    if (concurrentRunningProcessors > 0)
                    {
                        lock (_mutex)
                        {
                            concurrentRunningProcessors--;
                        }
                    }
                }
            }
        }
        catch (Exception ex)
        {
            DealWithException(ex); // e.g. Logs, etc.
            this.Stop();
        }
    }
    
    private async Task ProcessTasksInQueue()
    {
        using (var conn = new SqlConnection(ConnectionString))
        {
            var cmd = conn.CreateCommand();
            cmd.CommandText = "dbo.getNextInQueue";
            cmd.CommandType = CommandType.StoredProcedure;
            cmd.Parameters.Add(new SqlParameter("@itemsToGet", SqlDbType.Int) { Value = serviceParams.ItemsToGet });
            
            var rowsRetrieved = new List<RowInTaskQueue>();
            
            using (var reader = cmd.ExecuteReader())
            {
                while (reader.Read())
                {
                    var row = new RowInTaskQueue()
                    {
                        Id = reader.GetString(reader.GetOrdinal("id")),
                        Data = reader.GetString(reader.GetOrdinal("task_data")),
                        Status = reader.GetString(reader.GetOrdinal("status")),
                    };
                    rowsRetrieved.Add(row);
                }
            }
            
            ProcessTheRows(rowsRetrieved);
        }
    }
}
...