Чтобы понять, что происходит, вы должны понимать, как работают блоки потока данных. Внутри них нет заблокированных потоков, ожидающих прибытия сообщений. Обработка выполняется рабочими задачами. Давайте рассмотрим простой (и используемый по умолчанию) случай MaxDegreeOfParallelism = 1
. Изначально рабочих задач нет. Когда сообщение отправляется асинхронно с SendAsync
, та же самая задача, которая опубликовала сообщение, становится рабочей задачей и начинает обрабатывать сообщение. Если во время обработки первого сообщения будет опубликовано другое сообщение, произойдет что-то еще. Второе сообщение будет помещено в очередь ввода блока, и задание, которое его опубликовало, будет выполнено. Второе сообщение будет обработано рабочей задачей, которая обработала первое сообщение. Пока в очереди находятся сообщения, первоначальная рабочая задача будет выбирать их и обрабатывать их одно за другим. Если в какой-то момент больше нет буферизованных сообщений, рабочая задача завершится, и блок вернется в исходное состояние (ноль рабочих задач). Следующая SendAsync
станет новой рабочей задачей и так далее. С MaxDegreeOfParallelism = 1
в любой момент времени может существовать только одна рабочая задача.
Давайте продемонстрируем это на примере. Ниже приведен ActionBlock
, который подается с задержкой X и обрабатывает каждое сообщение с задержкой Y.
private static void ActionBlockTest(int sendDelay, int processDelay)
{
Console.WriteLine($"SendDelay: {sendDelay}, ProcessDelay: {processDelay}");
var asyncLocal = new AsyncLocal<int>();
var actionBlock = new ActionBlock<int>(async i =>
{
await Task.Delay(processDelay);
Console.WriteLine($"Processed {i}, Context: {asyncLocal.Value}");
});
Task.Run(async () =>
{
foreach (var i in Enumerable.Range(1, 5))
{
asyncLocal.Value = i;
await actionBlock.SendAsync(i);
await Task.Delay(sendDelay);
}
}).Wait();
actionBlock.Complete();
actionBlock.Completion.Wait();
}
Посмотрим, что произойдет, если мы отправим сообщения быстро и обработаем их медленно:
ActionBlockTest(100, 200); // .NET Core 3.0
SendDelay: 100, ProcessDelay: 200
Обработано 1, Контекст: 1
Обработано 2, Контекст: 1
Обработано 3, Контекст: 1
Обработано 4, Контекст: 1
Обработано 5, Контекст: 1
Контекст AsyncLocal
остался прежним, поскольку одна и та же рабочая задача обработала все сообщения.
Теперь давайте отправим сообщения медленно и быстро обработаем их:
ActionBlockTest(200, 100); // .NET Core 3.0
SendDelay: 200, ProcessDelay: 100
Обработано 1, Контекст: 1
Обработано 2,Контекст: 2
Обработано 3, Контекст: 3
Обработано 4, Контекст: 4
Обработано 5, Контекст: 5
Контекст AsyncLocal
отличается для каждого сообщения,потому что каждое сообщение было обработано другой рабочей задачей.
Моральный урок этой истории заключается в том, что каждыйSendAsync
не гарантирует создание единого асинхронного рабочего процесса, который следует за сообщением до конца его пути, до конца конвейера. Поэтому класс AsyncLocal
нельзя использовать для хранения окружающих данных для каждого сообщения.