Первоначально я использовал asyn c await для каждого из этих методов, и каждый из вызовов выполнялся асинхронно, но мы выяснили, что они не совпадают, тогда есть место для ошибок.
Итак, я подумал, что мы должны поставить все эти асинхронные задачи в очередь и отправить их в отдельном потоке, но я хочу знать, какие у нас есть варианты? Я столкнулся с 'SemaphoreSlim'.
SemaphoreSlim
действительно ограничивает асинхронный код для выполнения по одному и является допустимой формой взаимного исключения, Однако, поскольку вызовы «вне последовательности» могут вызвать ошибки, то SemaphoreSlim
является , а не подходящим решением, поскольку оно не гарантирует FIFO.
В более общем смысле примитив синхронизации отсутствует гарантирует FIFO, потому что это может вызвать проблемы из-за побочных эффектов, таких как конвои блокировки. С другой стороны, естественно, что структуры данных должны быть строго FIFO.
Таким образом, вам нужно использовать собственную очередь FIFO, а не неявную очередь выполнения. Channels - это хорошая, производительная, asyn c -совместимая очередь, но поскольку вы работаете на более старой версии C # /. NET, BlockingCollection<T>
будет работать:
public sealed class ExecutionQueue
{
private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();
public ExecutionQueue() => Complete = Task.Run(() => ProcessQueueAsync());
public Task Completion { get; }
public void Complete() => _queue.CompleteAdding();
private async Task ProcessQueueAsync()
{
foreach (var value in _queue.GetConsumingEnumerable())
await value();
}
}
Единственная хитрость часть с этой настройкой, как поставить в очередь работу. С точки зрения кода, ставящего в очередь работу, они хотят знать, когда лямбда выполняется, а не когда лямбда находится в очереди. С точки зрения метода очереди (который я называю Run
), метод должен завершить свою возвращенную задачу только после выполнения лямбды. Таким образом, вы можете написать метод очереди примерно так:
public Task Run(Func<Task> lambda)
{
var tcs = new TaskCompletionSource<object>();
_queue.Add(async () =>
{
// Execute the lambda and propagate the results to the Task returned from Run
try
{
await lambda();
tcs.TrySetResult(null);
}
catch (OperationCanceledException ex)
{
tcs.TrySetCanceled(ex.CancellationToken);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});
return tcs.Task;
}
Этот метод не так совершенен, как мог бы быть. Если задача завершается с более чем одним исключением (это нормально для параллельного кода), сохраняется только первое (это нормально для асинхронного кода c). Там также крайний случай вокруг обработки OperationCanceledException
. Но этот код достаточно хорош для большинства случаев.
Теперь вы можете использовать его так:
public static ExecutionQueue _queue = new ExecutionQueue();
public async Task SendModuleDataToDSAsync(Module parameters)
{
var tasks1 = new List<Task>();
var tasks2 = new List<Task>();
foreach (var setting in Module.param)
{
Task job1 = _queue.Run(() => SaveModule(setting));
tasks1.Add(job1);
Task job2 = _queue.Run(() => SaveModule(GetAdvancedData(setting)));
tasks2.Add(job2);
}
await Task.WhenAll(tasks1);
await Task.WhenAll(tasks2);
}