Я создал класс, обрабатывающий параллельные заливки на том же DataSet
. У каждой заливки есть свой DataSet
, и он будет объединен с исходным в конце выполнения. Для обработки отношений таблицы данных можно создать последовательный список заливок, которые будут выполняться параллельно с другими заливками. Метод слияния будет ожидать завершения всех операций до sh и слияния результатов в исходный DataSet
(переданный в конструкторе)
public class FillParallelizer<TDataSet> : IDisposable
where TDataSet : DataSet, new()
{
private bool _seqFillStarted = false;
private IList<QueryObject> _queryObjList = new List<QueryObject>();
private Queue<FillOperation> _seqOperationBufferQueue = new Queue<FillOperation>();
public DataSet MasterDS { get; }
public FillParallelizer(DataSet masterDS)
{
MasterDS = masterDS ?? throw new ArgumentNullException(nameof(masterDS));
}
public FillParallelizer<TDataSet> ParallelFill(Action<TDataSet> methodAction)
{
if (methodAction == null) throw new ArgumentNullException(nameof(methodAction));
QueryObject queryObj = new QueryObject(methodAction);
_queryObjList.Add(queryObj);
return this;
}
public void Merge()
{
var taskList = _queryObjList.Select(x => x.Task).ToArray();
Task.WaitAll(taskList);
foreach (var item in _queryObjList)
{
MasterDS.Merge(item.TempDS);
}
}
public FillParallelizer<TDataSet> SequentialFill(Action<TDataSet> action)
{
if(!_seqFillStarted)
{
_seqFillStarted = true;
}
if (!_seqFillStarted) throw new ArgumentException("Sequential fill not started");
_seqOperationBufferQueue.Enqueue(new FillOperation(action));
return this;
}
public FillParallelizer<TDataSet> RunSequentialFill()
{
_seqFillStarted = false;
QueryObject queryObj = new QueryObject(new Queue<FillOperation>(_seqOperationBufferQueue));
_seqOperationBufferQueue.Clear();
_queryObjList.Add(queryObj);
return this;
}
public void Dispose()
{
if (_queryObjList == null || !_queryObjList.Any())
{
return;
}
foreach (var item in _queryObjList) item.Dispose();
}
class FillOperation
{
public Action<TDataSet> Action { get; }
public FillOperation(Action<TDataSet> action)
{
Action = action ?? throw new ArgumentNullException(nameof(action));
}
public void Run(TDataSet tempDS)
{
if (tempDS == null) throw new ArgumentNullException(nameof(tempDS));
Action(tempDS);
}
}
class QueryObject : IDisposable
{
public TDataSet TempDS { get; }
public Queue<FillOperation> FillOperationList { get; }
public Task Task { get; }
public QueryObject(Queue<FillOperation> fillOperationList)
{
TempDS = new TDataSet();
FillOperationList = fillOperationList ?? throw new ArgumentNullException(nameof(fillOperationList));
Task =
Task
.Run
(() =>
{
foreach (FillOperation op in FillOperationList) op.Run(TempDS);
}
);
}
public QueryObject(Action<TDataSet> action)
: this(new Queue<FillOperation>(new FillOperation[] { new FillOperation(action) }))
{
}
public void Dispose()
{
Task.Dispose();
}
}
}
Пример использования:
using(MyDataSet myDS = new MyDataSet())
using (FillParallelizer<MyDataSet> parallelizer = new FillParallelizer<MyDataSet>(myDS))
{
parallelizer
.ParallelFill(ds => MyDAO.FillTable1(ds))
.ParallelFill(ds => MyDAO.FillTable2(ds))
.SequentialFill(ds => MyDAO.FillTable3(ds))
.SequentialFill(ds => MyDAO.FillTable4(ds))
.SequentialFill(ds => MyDAO.FillTable5(ds))
.RunSequentialFill()
.ParallelFill(ds => MyDAO.FillTable6(ds))
.ParallelFill(ds => MyDAO.FillTable7(ds))
.Merge();
//All fills completed
}
Методы заполнения 3, 4, 5 выполняются последовательно, но параллельно с другими заливками, для обработки отношений между таблицами