Неправильные значения AsyncLocal с потоком данных TPL - PullRequest
4 голосов
/ 01 октября 2019

Рассмотрим этот пример:

class Program

{
    private static readonly ITargetBlock<string> Mesh = CreateMesh();
    private static readonly AsyncLocal<string> AsyncLocalContext
        = new AsyncLocal<string>();

    static async Task Main(string[] args)
    {
        var tasks = Enumerable.Range(1, 4)
            .Select(ProcessMessage);
        await Task.WhenAll(tasks);

        Mesh.Complete();
        await Mesh.Completion;

        Console.WriteLine();
        Console.WriteLine("Done");
    }

    private static async Task ProcessMessage(int number)
    {
        var param = number.ToString();
        using (SetScopedAsyncLocal(param))
        {
            Console.WriteLine($"Before send {param}");
            await Mesh.SendAsync(param);
            Console.WriteLine($"After send {param}");
        }
    }

    private static IDisposable SetScopedAsyncLocal(string value)
    {
        AsyncLocalContext.Value = value;

        return new Disposer(() => AsyncLocalContext.Value = null);
    }

    private static ITargetBlock<string> CreateMesh()
    {
        var blockOptions = new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = DataflowBlockOptions.Unbounded,
            EnsureOrdered = false,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        };

        var block1 = new TransformBlock<string, string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block1 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");

            return input;
        }, blockOptions);

        var block2 = new TransformBlock<string, string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block2 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");

            return input;
        }, blockOptions);

        var block3 = new ActionBlock<string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block3 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");
        }, blockOptions);

        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};

        block1.LinkTo(block2, linkOptions);
        block2.LinkTo(block3, linkOptions);

        return new EncapsulatedActionBlock<string>(block1, block3.Completion);
    }
}

internal class EncapsulatedActionBlock<T> : ITargetBlock<T>
{
    private readonly ITargetBlock<T> _wrapped;

    public EncapsulatedActionBlock(ITargetBlock<T> wrapped, Task completion)
    {
        _wrapped = wrapped;
        Completion = completion;
    }

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        T messageValue, ISourceBlock<T> source, bool consumeToAccept) =>
        _wrapped.OfferMessage(messageHeader, messageValue, source, consumeToAccept);

    public void Complete() => _wrapped.Complete();

    public void Fault(Exception exception) => _wrapped.Fault(exception);

    public Task Completion { get; }
}

internal class Disposer : IDisposable
{
    private readonly Action _disposeAction;

    public Disposer(Action disposeAction)
    {
        _disposeAction = disposeAction
            ?? throw new ArgumentNullException(nameof(disposeAction));
    }

    public void Dispose()
    {
        _disposeAction();
    }
}

Результат выполнения будет примерно таким:

Before send 1
After send 1
Before send 2
After send 2
Before send 3
After send 3
Before send 4
After send 4
   Block1 [thread 9] Input: 3 - Context: 3.
   Block1 [thread 10] Input: 2 - Context: 1.
   Block1 [thread 8] Input: 4 - Context: 4.
   Block1 [thread 11] Input: 1 - Context: 2.
   Block2 [thread 9] Input: 2 - Context: 3.
   Block2 [thread 7] Input: 1 - Context: 2.
   Block2 [thread 10] Input: 3 - Context: 3.
   Block2 [thread 8] Input: 4 - Context: 4.
   Block3 [thread 11] Input: 4 - Context: 4.
   Block3 [thread 7] Input: 1 - Context: 2.
   Block3 [thread 9] Input: 3 - Context: 3.
   Block3 [thread 4] Input: 2 - Context: 3.

Done

Как видите, значение переданного контекста и сохраненное значение не всегдато же самое после перехода ко второму блоку TDF. Такое поведение приводит к неправильному использованию функции LogContext в нескольких средах Logging.

  1. Это ожидаемое поведение (пожалуйста, объясните почему)?
  2. Поток данных TPL каким-то образом портит контекст выполнения?

1 Ответ

2 голосов
/ 01 октября 2019

Чтобы понять, что происходит, вы должны понимать, как работают блоки потока данных. Внутри них нет заблокированных потоков, ожидающих прибытия сообщений. Обработка выполняется рабочими задачами. Давайте рассмотрим простой (и используемый по умолчанию) случай MaxDegreeOfParallelism = 1. Изначально рабочих задач нет. Когда сообщение отправляется асинхронно с SendAsync, та же самая задача, которая опубликовала сообщение, становится рабочей задачей и начинает обрабатывать сообщение. Если во время обработки первого сообщения будет опубликовано другое сообщение, произойдет что-то еще. Второе сообщение будет помещено в очередь ввода блока, и задание, которое его опубликовало, будет выполнено. Второе сообщение будет обработано рабочей задачей, которая обработала первое сообщение. Пока в очереди находятся сообщения, первоначальная рабочая задача будет выбирать их и обрабатывать их одно за другим. Если в какой-то момент больше нет буферизованных сообщений, рабочая задача завершится, и блок вернется в исходное состояние (ноль рабочих задач). Следующая SendAsync станет новой рабочей задачей и так далее. С MaxDegreeOfParallelism = 1 в любой момент времени может существовать только одна рабочая задача.

Давайте продемонстрируем это на примере. Ниже приведен ActionBlock, который подается с задержкой X и обрабатывает каждое сообщение с задержкой Y.

private static void ActionBlockTest(int sendDelay, int processDelay)
{
    Console.WriteLine($"SendDelay: {sendDelay}, ProcessDelay: {processDelay}");
    var asyncLocal = new AsyncLocal<int>();
    var actionBlock = new ActionBlock<int>(async i =>
    {
        await Task.Delay(processDelay);
        Console.WriteLine($"Processed {i}, Context: {asyncLocal.Value}");
    });
    Task.Run(async () =>
    {
        foreach (var i in Enumerable.Range(1, 5))
        {
            asyncLocal.Value = i;
            await actionBlock.SendAsync(i);
            await Task.Delay(sendDelay);
        }
    }).Wait();
    actionBlock.Complete();
    actionBlock.Completion.Wait();
}

Посмотрим, что произойдет, если мы отправим сообщения быстро и обработаем их медленно:

ActionBlockTest(100, 200); // .NET Core 3.0

SendDelay: 100, ProcessDelay: 200
Обработано 1, Контекст: 1
Обработано 2, Контекст: 1
Обработано 3, Контекст: 1
Обработано 4, Контекст: 1
Обработано 5, Контекст: 1

Контекст AsyncLocal остался прежним, поскольку одна и та же рабочая задача обработала все сообщения.

Теперь давайте отправим сообщения медленно и быстро обработаем их:

ActionBlockTest(200, 100); // .NET Core 3.0

SendDelay: 200, ProcessDelay: 100
Обработано 1, Контекст: 1
Обработано 2,Контекст: 2
Обработано 3, Контекст: 3
Обработано 4, Контекст: 4
Обработано 5, Контекст: 5

Контекст AsyncLocal отличается для каждого сообщения,потому что каждое сообщение было обработано другой рабочей задачей.

Моральный урок этой истории заключается в том, что каждыйSendAsync не гарантирует создание единого асинхронного рабочего процесса, который следует за сообщением до конца его пути, до конца конвейера. Поэтому класс AsyncLocal нельзя использовать для хранения окружающих данных для каждого сообщения.

...