Parallel.ForEach
предназначен для параллелизма данных (т. Е. Обработки строк по 100 КБ с использованием всех 8 ядер), а не одновременных операций. По сути, это проблема pub / sub и async , если не проблема конвейера. В этом случае CPU ничего не может сделать, просто запустите асинхронные операции и дождитесь их завершения.
.NET обрабатывает это, начиная с .NET 4.5 через классы Dataflow, а в последнее время - пространство имен нижнего уровня System.Threading.Channel.
В простейшей форме вы можете создать ActionBlock <> , который принимает буфер и целевое соединение и публикует данные. Допустим, вы используете этот метод для отправки данных на сервер:
async Task MyBulkCopyMethod(string connectionString,DataTable data)
{
using(var bcp=new SqlBulkCopy(connectionString))
{
//Set up mappings etc.
//....
await bcp.WriteToServerAsync(data);
}
}
Вы можете использовать это с классом ActionBlock с настроенной степенью параллелизма. Классы потока данных, такие как ActionBlock, имеют свои собственные входные и, при необходимости, выходные буферы, поэтому нет необходимости создавать отдельную очередь:
class DataMessage
{
public string Connection{get;set;}
public DataTable Data {get;set;}
}
...
var options=new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 50,
BoundedCapacity = 8
};
var block=new ActionBlock<DataMessage>(msg=>MyBulkCopyMethod(msg.Connection,msg.Data, options);
Мы можем начатьотправка сообщений в блок сейчас. Установив емкость на 8, мы гарантируем, что входной буфер не будет заполняться большими сообщениями, если блок слишком медленный. MaxDegreeOfParallelism
контролирует, как могут выполняться операции одновременно. Допустим, мы хотим отправить одни и те же данные на многие серверы:
var data=.....;
var servers=new[]{connString1, connString2,....};
var messages= from sv in servers
select new DataMessage{ ConnectionString=sv,Table=data};
foreach(var msg in messages)
{
await block.SendAsync(msg);
}
//Tell the block we are done
block.Complete();
//Await for all messages to finish processing
await block.Completion;
Retries
Одна из возможностей повторных попыток - использовать цикл повторения в рабочей функции. Лучше было бы использовать другой блок и публиковать там сообщения о сбоях.
var block=new ActionBlock<DataMessage>(async msg=> {
try {
await MyBulkCopyMethod(msg.Connection,msg.Data, options);
}
catch(SqlException exc) when (some retry condition)
{
//Post without awaiting
retryBlock.Post(msg);
});
Когда исходный блок завершится, мы хотим сказать, чтобы блок повторов также завершился, независимо от того, что:
block.Completion.ContinueWith(_=>retryBlock.Complete());
Теперь мы можем дождаться завершения retryBlock
.
Этот блок может иметь меньший DOP и, возможно, задержку между попытками:
var retryOptions=new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 5
};
var retryBlock=new ActionBlock<DataMessage>(async msg=>{
await Task.Delay(1000);
try {
await MyBulkCopyMethod(msg.Connection,msg.Data, options);
}
catch (Exception ....)
{
...
}
});
Этот шаблон можно повторить для создания нескольких уровней повторных попыток или различных условий. Он также может быть использован для создания различных приоритетных работников, предоставляя больший DOP высокоприоритетным работникам или большую задержку низкоприоритетным работникам