Как создать новый контекст для каждой операции asyn c и использовать его в поточно-ориентированном режиме для хранения некоторых данных - PullRequest
4 голосов
/ 28 апреля 2020

Я работаю над чем-то вроде контекстного кэширования и немного застрял в поточной безопасности ...

Допустим, у меня есть следующий код:

public class AsynLocalContextualCacheAccessor : IContextualCacheAccessor
{
    private static readonly AsyncLocal<CacheScopesManager> _rCacheContextManager = new AsyncLocal<CacheScopesManager>();

    public AsynLocalContextualCacheAccessor()
    {
    }

    public CacheScope Current
    { 
        get
        {
            if (_rCacheContextManager.Value == null)
                _rCacheContextManager.Value = new CacheScopesManager();

            return _rCacheContextManager.Value.Current;
        }
    } 
}

public class CacheScopesManager
{
    private static readonly AsyncLocal<ImmutableStack<CacheScope>> _scopesStack = new AsyncLocal<ImmutableStack<CacheScope>>(OnValueChanged);

    public CacheScopesManager()
    {
        CacheScope contextualCache = _NewScope();

        _scopesStack.Value = ImmutableStack.Create<CacheScope>();
        _scopesStack.Value = _scopesStack.Value.Push(contextualCache);
    }

    public CacheScope Current
    {
        get
        {
            if (_scopesStack.Value.IsEmpty)
                return null;

            CacheScope current = _scopesStack.Value.Peek();
            if (current.IsDisposed)
            {
                _scopesStack.Value = _scopesStack.Value.Pop();
                return Current;
            }

            // Create a new scope if we entered the new physical thread in the same logical thread
            // in order to update async local stack and automatically have a new scope per every logically new operation
            int currentThreadId = Thread.CurrentThread.ManagedThreadId;
            if (currentThreadId != current.AcquiredByThread)
            {
                current = _NewScope();
                _scopesStack.Value = _scopesStack.Value.Push(current);
            }

            return current;
        }
    }

    private static void OnValueChanged(AsyncLocalValueChangedArgs<ImmutableStack<CacheScope>> args)
    {
        // Manual is not interesting to us.
        if (!args.ThreadContextChanged)
            return;

        ImmutableStack<CacheScope> currentStack = args.CurrentValue;
        ImmutableStack<CacheScope> previousStack = args.PreviousValue;

        int threadId = Thread.CurrentThread.ManagedThreadId; 

        int threadIdCurrent = args.CurrentValue?.Peek().AcquiredByThread ?? -1;
        int threadIdPrevious = args.PreviousValue?.Peek().AcquiredByThread ?? -1;

        // Be sure in disposing of the scope
        // This situation means a comeback of the previous execution context, in case if in the previous scope Current was used.
        if (currentStack != null && previousStack != null
            && currentStack.Count() > previousStack.Count())
            currentStack.Peek().Dispose();
    }
}

И я пытаюсь выполнить следующий тест:

    [TestMethod]
    [TestCategory(TestCategoryCatalogs.UnitTest)]
    public async Task AsyncLocalCacheManagerAccessor_request_that_processed_by_more_than_by_one_thread_is_threadsafe()
    {
        IContextualCacheAccessor asyncLocalAccessor = new AsynLocalContextualCacheAccessor();
        Task requestAsyncFlow = Task.Run(async () =>
        {
            string key1 = "key1";
            string value1 = "value1";

            string key2 = "key2";
            string value2 = "value2";

            CacheScope scope1 = asyncLocalAccessor.Current;

            string initialKey = "k";
            object initialVal = new object();

            scope1.Put(initialKey, initialVal);
            scope1.TryGet(initialKey, out object result1).Should().BeTrue();
            result1.Should().Be(initialVal);

            var parallel1 = Task.Run(async () =>
            {
                await Task.Delay(5);
                var cache = asyncLocalAccessor.Current;
                cache.TryGet(initialKey, out object result2).Should().BeTrue();
                result2.Should().Be(initialVal);

                cache.Put(key1, value1);
                await Task.Delay(10);

                cache.Items.Count.Should().Be(1);
                cache.TryGet(key1, out string result11).Should().BeTrue();
                result11.Should().Be(value1);
            });

            var parallel2 = Task.Run(async () =>
            {
                await Task.Delay(2);
                var cache = asyncLocalAccessor.Current;

                cache.StartScope();

                cache.TryGet(initialKey, out object result3).Should().BeTrue();
                result3.Should().Be(initialVal);

                cache.Put(key2, value2);
                await Task.Delay(15);

                cache.Items.Count.Should().Be(1);
                cache.TryGet(key2, out string result21).Should().BeTrue();
                result21.Should().Be(value2);
            });

            await Task.WhenAll(parallel1, parallel2);

            // Here is an implicit dependency from Synchronization Context, and in most cases
            // the next code will be handled by a new thread, that will cause a creation of a new scope,
            // as well as for any other await inside any async operation,  which is quite bad:( 

            asyncLocalAccessor.Current.Items.Count.Should().Be(1);
            asyncLocalAccessor.Current.TryGet(initialKey, out object result4).Should().BeTrue();
            result4.Should().Be(initialVal);
        });

        await requestAsyncFlow;

        asyncLocalAccessor.Current.Items.Count.Should().Be(0);
    }

И на самом деле этот тест зеленый, но есть одна (или более) проблема. Итак, что я пытаюсь достичь, это создать стек областей для каждой новой операции asyn c (если к текущей области обращались), и когда эта операция завершится, мне нужно успешно вернуться к предыдущему стеку. Я сделал это на основе текущего идентификатора потока (потому что я не нашел другого способа сделать это автоматически, но мне не нравилось мое решение), но если было выполнено продолжение операции asyn c не в начальном потоке (неявная зависимость от текущего SynchronizationContext), а в любом другом, тогда это вызывает создание новой области видимости, что очень плохо, как для меня.

Я был бы рад, если бы кто-то может подсказать, как это сделать правильно, большое спасибо! :)

UPD 1. Код обновлен для добавления static для каждого поля AsyncLocal, так как значение каждого AsyncLocal получается из ExecutionContext.GetLocalValue(), который является stati c, поэтому не stati c AsyncLocal просто избыточное давление памяти.

UPD 2. Спасибо, @weichch за ответ, поскольку комментарий может быть большим, я просто добавил дополнительную информацию прямо к вопросу. Итак, в моем случае logi c с инкапсулированным AsyncLocal и тем, что может сделать клиент моего кода - он вызывает только Current на IContextualCacheAccessor, который получит экземпляр объекта в AsyncLocal<CacheScopesManager>, AsyncLocal используется здесь только для того, чтобы иметь один экземпляр CacheScopesManager на логический запрос и делиться им по этому запросу, аналогично жизненному циклу области Io C -Container, но жизненный цикл такого объекта определяется с момента создания объекта до конец асинхронного потока c, в котором был создан этот объект. Или давайте подумаем о ASP NET Core, где у нас есть IHttpContext, IHttpContext, кажется, не является неизменным, но все еще используется как AsyncLocal через IHttpContextAccessor, не так ли? Подобно этому был разработан CacheScopesManager.

Таким образом, если клиентский код для получения текущего CacheScope может вызывать Current только на IContextualCacheAccessor, то в случае AsyncLocal реализация IContextualCacheAccessor стека вызовов попадет в следующий код:

public CacheScope Current
{
    get
    {
        if (_scopesStack.Value.IsEmpty)
            return null;

        CacheScope current = _scopesStack.Value.Peek();
        if (current.IsDisposed)
        {
            _scopesStack.Value = _scopesStack.Value.Pop();
            return Current;
        }

        // Create a new scope if we entered the new physical thread in the same logical thread
        // in order to update async local stack and automatically have a new scope per every logically new operation
        int currentThreadId = Thread.CurrentThread.ManagedThreadId;
        if (currentThreadId != current.AcquiredByThread)
        {
            current = _NewScope();
            _scopesStack.Value = _scopesStack.Value.Push(current);
        }

        return current;
    }
}

, и если другой поток решил использовать Current, это приведет к созданию новой области, и поскольку ImmutableStack<CacheScope> - это AsyncLocal, мы сохраняем стек предыдущего асинхронного * 1070. * вытекает из любых изменений, что означает, что когда мы вернемся к нему, стек будет в порядке без каких-либо повреждений (наверняка, если хаки не использовались). Все это было сделано для того, чтобы сделать стек областей безопасным, а не «AsyncLocal». Итак, ваш код

async Task Method1()
{
    Cache.Push(new CacheScope { Value = "Method1" });

    await Task.WhenAll(Method2(), Method3());

    Cache.Pop();
}

async Task Method2()
{
    await Task.Delay(10);

    var scope = Cache.CurrentStack.Peek();
    scope.Value = "Method2";

    Console.WriteLine($"Method2 - {scope.Value}");
}

async Task Method3()
{
    await Task.Delay(10);

    var scope = Cache.CurrentStack.Peek();

    Console.WriteLine($"Method3 - {scope.Value}");
}

в случае, если мой метод доступа используется, не вызовет мутации в потоке asyn c, которая будет отражена в другом (и добавление данных в область действия предыдущего асинхронного кода). c поток, до смены нити - нормально для меня). Но есть одна проблема, на самом деле, цель этого CacheScope состоит в том, чтобы иметь некоторое хранилище, которое перебирает логический запрос и кэширует некоторые данные, и эти данные ограничиваются CacheScope и будут извлечены из ссылочной памяти, как только сфера закончится. И я хочу минимизировать создание таких областей, что означает, что, если код выполнялся последовательно, не должно быть никаких причин для создания новой области, даже если продолжение какой-либо асинхронной операции c произошло в другом потоке, потому что логически код все еще «последовательный», и можно использовать одну и ту же область видимости для такого «последовательного» кода. Пожалуйста, поправьте меня, если я где-то ошибаюсь.

Но ваш ответ и объяснение действительно полезны и наверняка защитят других от ошибок. Кроме того, это помогло мне понять, что Стивен имел в виду:

Если вы выполните go по этому маршруту, я рекомендую написать множество и множество модульных тестов.

мой Engli sh плохой, и я подумал, что «маршрут» означает «ссылка на статью», теперь понимаю, что это довольно «путь» в этом контексте.

UPD 3. Добавлен код CacheScope для лучшей картинки.

public class CacheScope : IDisposableExtended
{
    private ICacheScopesManager _scopeManager;
    private CacheScope _parentScope;

    private Dictionary<string, object> _storage = new Dictionary<string, object>();


    internal CacheScope(Guid id, int boundThreadId, ICacheScopesManager scopeManager, 
        CacheScope parentScope)
    {
        _scopeManager = scopeManager.ThrowIfArgumentIsNull(nameof(scopeManager));

        Id = id;
        AcquiredByThread = boundThreadId;

        _parentScope = parentScope;
    }

    public Guid Id { get; }

    public int AcquiredByThread { get; }

    public IReadOnlyCollection<object> Items => _storage?.Values;

    public bool IsDisposed { get; private set; } = false;

    public bool TryExpire<TItem>(string key, out TItem expiredItem)
    {
        _AssertInstanceIsDisposed();

        key.ThrowIfArgumentIsNull(nameof(key));

        expiredItem = default(TItem);

        try
        {
            expiredItem = (TItem)_storage[key];
        }
        catch (KeyNotFoundException)
        {
            // Even if item is present in parent scope it cannot be expired from inner scope.
            return false;
        }

        _storage.Remove(key);

        return true;
    }


    public TItem GetOrPut<TItem>(string key, Func<string, TItem> putFactory)
    {
        _AssertInstanceIsDisposed();

        key.ThrowIfArgumentIsNull(nameof(key));
        putFactory.ThrowIfArgumentIsNull(nameof(putFactory));

        TItem result;

        try
        {
            result = (TItem)_storage[key];
        }
        catch (KeyNotFoundException)
        {
            if (_parentScope != null && _parentScope.TryGet(key, out result))
                return result;

            result = putFactory(key);

            _storage.Add(key, result);
        }

        return result;
    }

    public void Put<TItem>(string key, TItem item)
    {
        _AssertInstanceIsDisposed();

        key.ThrowIfArgumentIsNull(nameof(key));

        _storage[key] = item;

        // We are not even thinking about to change the parent scope here,
        // because parent scope should be considered by current as immutable.
    }

    public bool TryGet<TItem>(string key, out TItem item)
    {
        _AssertInstanceIsDisposed();

        key.ThrowIfArgumentIsNull(nameof(key));

        item = default(TItem);

        try
        {
            item = (TItem)_storage[key];
        }
        catch (KeyNotFoundException)
        {
            return _parentScope != null && _parentScope.TryGet(key, out item);
        }

        return true;
    }

    public void Dispose()
    {
        if (IsDisposed)
            return;

        Dictionary<string, object> localStorage = Interlocked.Exchange(ref _storage, null);
        if (localStorage == null)
        {
            // that should never happen but Dispose in general is expected to be safe to call so... let's obey the rules
            return;
        }

        foreach (var item in localStorage.Values)
            if (item is IDisposable disposable)
                disposable.Dispose();

        _parentScope = null;
        _scopeManager = null;

        IsDisposed = true;
    }

    public CacheScope StartScope() => _scopeManager.CreateScope(this);
}

Ответы [ 2 ]

4 голосов
/ 29 апреля 2020

Ваш код действительно борется с тем, как работает AsyncLocal<T>. Установка в геттере, попытка управлять областями вручную, наличие асин * локального менеджера c для асин c локального типа и код с использованием обработчика изменений - все проблематично c.

I Поверьте, все это на самом деле, чтобы попытаться справиться с тем фактом, что CacheScope не является неизменным. Лучший способ решить эту проблему - сделать CacheScope правильным неизменным объектом. Тогда все остальное станет более или менее естественным.

Я считаю, что часто проще написать отдельный static API для неизменяемых объектов, который более "asyn c локально-дружественный". Например:

public class ImplicitCache
{
  private static readonly AsyncLocal<ImmutableStack<(string, object)>> _asyncLocal = new AsyncLocal<ImmutableStack<(string, object)>>();

  private static ImmutableStack<(string, object)> CurrentStack
  {
    get => _asyncLocal.Current ?? ImmutableStack.Create<ImmutableDictionary<string, object>>();
    set => _asyncLocal.Current = value.IsEmpty ? null : value;
  }

  // Separate API:

  public static IDisposable Put(string key, object value)
  {
    if (key == null)
      throw new InvalidOperationException();
    CurrentStack = CurrentStack.Push((key, value));
    return new Disposable(() => CurrentStack = CurrentStack.Pop());
  }

  public static bool TryGet(string key, out object value)
  {
    var result = CurrentStack.Reverse().FirstOrDefault(x => x.Item1 == key);
    value = result.Item2;
    return result.Item1 != null;
  }
}

Использование:

public async Task AsyncLocalCacheManagerAccessor_request_that_processed_by_more_than_by_one_thread_is_threadsafe()
{
  Task requestAsyncFlow = Task.Run(async () =>
  {
    string key1 = "key1";
    string value1 = "value1";

    string key2 = "key2";
    string value2 = "value2";

    string initialKey = "k";
    object initialVal = new object();

    using var dispose1 = ImplicitCache.Put(initialKey, initialVal);
    ImplicitCache.TryGet(initialKey, out object result1).Should().BeTrue();
    result1.Should().Be(initialVal);

    var parallel1 = Task.Run(async () =>
    {
      await Task.Delay(5);
      ImplicitCache.TryGet(initialKey, out object result2).Should().BeTrue();
      result2.Should().Be(initialVal);

      using var dispose2 = ImplicitCache.Put(key1, value1);
      await Task.Delay(10);

      ImplicitCache.TryGet(key1, out string result11).Should().BeTrue();
      result11.Should().Be(value1);
    });

    var parallel2 = Task.Run(async () =>
    {
      await Task.Delay(2);

      ImplicitCache.TryGet(initialKey, out object result3).Should().BeTrue();
      result3.Should().Be(initialVal);

      using var disose3 = ImplicitCache.Put(key2, value2);
      await Task.Delay(15);

      ImplicitCache.TryGet(key2, out string result21).Should().BeTrue();
      result21.Should().Be(value2);
    });

    await Task.WhenAll(parallel1, parallel2);

    ImplicitCache.TryGet(initialKey, out object result4).Should().BeTrue();
    result4.Should().Be(initialVal);
  });

  await requestAsyncFlow;

  ImplicitCache.TryGet(initialKey, out _).Should().BeFalse();
}
2 голосов
/ 01 мая 2020

Я думаю, @StephenCleary не говорит, что изменяемый CacheScope неверен, но семантически неверен, что означает, что использование изменяемого CacheScope может нарушить цель использования AsyncLocal<T> в ваш кэш.

AsyncLocal<T> предназначен для обеспечения доступа к окружающим данным, локальным для асинхронного c потока управления. Использование изменяемого типа данных в AsyncLocal<T> может привести к тому, что такие локальные данные выйдут из его области действия.

Например, рассмотрим это

static class Cache
{
    private static AsyncLocal<ImmutableStack<CacheScope>> StackValue 
        = new AsyncLocal<ImmutableStack<CacheScope>>();

    public static ImmutableStack<CacheScope> CurrentStack
    {
        get => StackValue.Value;
        set => StackValue.Value = value;
    }

    public static void Push(CacheScope scope)
    {
        CurrentStack = CurrentStack.Push(scope);
    }

    public static CacheScope Peek()
    {
        return CurrentStack.Peek();
    }

    public static void Pop()
    {
        CurrentStack = CurrentStack.Pop();
    }
}

Предполагая, что есть два метода:

async Task Method1()
{
    // Push scope where Value = Method1
    Cache.Push(new CacheScope { Value = "Method1" });

    // Call method2
    await Method2();

    // Unexpected: value = Method2 
    var value = Cache.CurrentStack.Peek().Value;

    Cache.Pop();
}

async Task Method2()
{
    await Task.Delay(10);

    var scope = Cache.CurrentStack.Peek();
    scope.Value = "Method2";
}

Method2 - это новый асинхронный поток c, который имеет свой собственный контекст логического вызова, скопированный из Method1. Однако, поскольку копия является мелкой копией, два контекста будут иметь одинаковые CacheScope экземпляры в ImmutableStack. Мутации, сделанные в Method2, могут быть неожиданно отражены в Method1.

И помните, мы могли бы также сделать fork / join :

async Task Method1()
{
    Cache.Push(new CacheScope { Value = "Method1" });

    await Task.WhenAll(Method2(), Method3());

    Cache.Pop();
}

async Task Method2()
{
    await Task.Delay(10);

    var scope = Cache.CurrentStack.Peek();
    scope.Value = "Method2";

    Console.WriteLine($"Method2 - {scope.Value}");
}

async Task Method3()
{
    await Task.Delay(10);

    var scope = Cache.CurrentStack.Peek();

    Console.WriteLine($"Method3 - {scope.Value}");
}

Вы могли видеть два набора результатов:

Method3 - Method1
Method2 - Method2

и

Method3 - Method2
Method2 - Method2

Мутация, созданная в Method2, неожиданно переходит в Method3.

Такое неожиданное поведение может быть проблематичным c до вашего кэша. Было бы очень трудно найти ошибки, связанные с неявным контекстом, когда стек вызовов довольно большой, поэтому Стивен предложил:

Если вы выполните go по этому маршруту, я рекомендую писать много и множество юнит-тестов.

...