Является ли заполнение разных DataTables одного и того же DataSet потокобезопасной операцией? - PullRequest
2 голосов
/ 08 апреля 2020

У меня есть много методов, заполняющих разные DataTables в одном и том же DataSet:

public override void FillMethod1(MyDataSet ds)
{
    try
    {
        using (SqlDataAdapter adapter = new SqlDataAdapter(query, connectionObj))
        {
            using (SqlCommandBuilder adapterSCB = new SqlCommandBuilder(adapter))
            {
                adapter.Fill(ds.MyTable);
            }
        }
    }
    catch (Exception e)
    {
        //Exception handling
    }
}

public override void FillMethod2(MyDataSet ds)
{
    try
    {
        using (SqlDataAdapter adapter = new SqlDataAdapter(query, connectionObj))
        {
            using (SqlCommandBuilder adapterSCB = new SqlCommandBuilder(adapter))
            {
                adapter.Fill(ds.MyTable2);
            }
        }
    }
    catch (Exception e)
    {
        //Exception handling
    }
}

[....]

using(MyDataSet ds = new MyDataSet())
{
    FillMethod1(ds);
    FillMethod2(ds);
    [...]
}

И я бы хотел распараллелить эти операции, используя Task:

using(MyDataSet ds = new MyDataSet())
{
    Task fill1 = Task.Run(() => FillMethod1(ds));
    Task fill2 = Task.Run(() => FillMethod2(ds));
    [...]

    Task.WaitAll(fill1, fill2, [...]);
}

После некоторых исследований Я обнаружил, что DataSet не является поточно-ориентированным, но безопасен ли он при работе с другими DataTable?

1 Ответ

0 голосов
/ 10 апреля 2020

Я создал класс, обрабатывающий параллельные заливки на том же DataSet. У каждой заливки есть свой DataSet, и он будет объединен с исходным в конце выполнения. Для обработки отношений таблицы данных можно создать последовательный список заливок, которые будут выполняться параллельно с другими заливками. Метод слияния будет ожидать завершения всех операций до sh и слияния результатов в исходный DataSet (переданный в конструкторе)

public class FillParallelizer<TDataSet> : IDisposable
    where TDataSet : DataSet, new()
{
    private bool _seqFillStarted = false;
    private IList<QueryObject> _queryObjList = new List<QueryObject>();
    private Queue<FillOperation> _seqOperationBufferQueue = new Queue<FillOperation>();

    public DataSet MasterDS { get; }

    public FillParallelizer(DataSet masterDS)
    {
        MasterDS = masterDS ?? throw new ArgumentNullException(nameof(masterDS));
    }

    public FillParallelizer<TDataSet> ParallelFill(Action<TDataSet> methodAction)
    {
        if (methodAction == null) throw new ArgumentNullException(nameof(methodAction));

        QueryObject queryObj = new QueryObject(methodAction);
        _queryObjList.Add(queryObj);

        return this;
    }

    public void Merge()
    {
        var taskList = _queryObjList.Select(x => x.Task).ToArray();
        Task.WaitAll(taskList);

        foreach (var item in _queryObjList)
        {
            MasterDS.Merge(item.TempDS);
        }
    }

    public FillParallelizer<TDataSet> SequentialFill(Action<TDataSet> action)
    {
        if(!_seqFillStarted)
        {
            _seqFillStarted = true;
        }

        if (!_seqFillStarted) throw new ArgumentException("Sequential fill not started");

        _seqOperationBufferQueue.Enqueue(new FillOperation(action));
        return this;
    }

    public FillParallelizer<TDataSet> RunSequentialFill()
    {
        _seqFillStarted = false;
        QueryObject queryObj = new QueryObject(new Queue<FillOperation>(_seqOperationBufferQueue));
        _seqOperationBufferQueue.Clear();
        _queryObjList.Add(queryObj);

        return this;
    }

    public void Dispose()
    {
        if (_queryObjList == null || !_queryObjList.Any())
        {
            return;
        }

        foreach (var item in _queryObjList) item.Dispose();
    }

    class FillOperation
    {
        public Action<TDataSet> Action { get; }

        public FillOperation(Action<TDataSet> action)
        {
            Action = action ?? throw new ArgumentNullException(nameof(action));
        }

        public void Run(TDataSet tempDS)
        {
            if (tempDS == null) throw new ArgumentNullException(nameof(tempDS));
            Action(tempDS);
        }
    }

    class QueryObject : IDisposable
    {
        public TDataSet TempDS { get; }
        public Queue<FillOperation> FillOperationList { get; }
        public Task Task { get; }

        public QueryObject(Queue<FillOperation> fillOperationList)
        {
            TempDS = new TDataSet();
            FillOperationList = fillOperationList ?? throw new ArgumentNullException(nameof(fillOperationList));
            Task =
                Task
                    .Run
                    (() =>
                    {
                        foreach (FillOperation op in FillOperationList) op.Run(TempDS);
                    }
                    );
        }

        public QueryObject(Action<TDataSet> action)
            : this(new Queue<FillOperation>(new FillOperation[] { new FillOperation(action) }))
        {
        }

        public void Dispose()
        {
            Task.Dispose();
        }
    }
}

Пример использования:

using(MyDataSet myDS = new MyDataSet())
using (FillParallelizer<MyDataSet> parallelizer = new FillParallelizer<MyDataSet>(myDS))
{
    parallelizer
        .ParallelFill(ds => MyDAO.FillTable1(ds))
        .ParallelFill(ds => MyDAO.FillTable2(ds))

        .SequentialFill(ds => MyDAO.FillTable3(ds))
        .SequentialFill(ds => MyDAO.FillTable4(ds))
        .SequentialFill(ds => MyDAO.FillTable5(ds))
        .RunSequentialFill()

        .ParallelFill(ds => MyDAO.FillTable6(ds))
        .ParallelFill(ds => MyDAO.FillTable7(ds))

        .Merge();

    //All fills completed
}

Методы заполнения 3, 4, 5 выполняются последовательно, но параллельно с другими заливками, для обработки отношений между таблицами

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...