Я работаю над чем-то вроде контекстного кэширования и немного застрял в поточной безопасности ...
Допустим, у меня есть следующий код:
public class AsynLocalContextualCacheAccessor : IContextualCacheAccessor
{
private static readonly AsyncLocal<CacheScopesManager> _rCacheContextManager = new AsyncLocal<CacheScopesManager>();
public AsynLocalContextualCacheAccessor()
{
}
public CacheScope Current
{
get
{
if (_rCacheContextManager.Value == null)
_rCacheContextManager.Value = new CacheScopesManager();
return _rCacheContextManager.Value.Current;
}
}
}
public class CacheScopesManager
{
private static readonly AsyncLocal<ImmutableStack<CacheScope>> _scopesStack = new AsyncLocal<ImmutableStack<CacheScope>>(OnValueChanged);
public CacheScopesManager()
{
CacheScope contextualCache = _NewScope();
_scopesStack.Value = ImmutableStack.Create<CacheScope>();
_scopesStack.Value = _scopesStack.Value.Push(contextualCache);
}
public CacheScope Current
{
get
{
if (_scopesStack.Value.IsEmpty)
return null;
CacheScope current = _scopesStack.Value.Peek();
if (current.IsDisposed)
{
_scopesStack.Value = _scopesStack.Value.Pop();
return Current;
}
// Create a new scope if we entered the new physical thread in the same logical thread
// in order to update async local stack and automatically have a new scope per every logically new operation
int currentThreadId = Thread.CurrentThread.ManagedThreadId;
if (currentThreadId != current.AcquiredByThread)
{
current = _NewScope();
_scopesStack.Value = _scopesStack.Value.Push(current);
}
return current;
}
}
private static void OnValueChanged(AsyncLocalValueChangedArgs<ImmutableStack<CacheScope>> args)
{
// Manual is not interesting to us.
if (!args.ThreadContextChanged)
return;
ImmutableStack<CacheScope> currentStack = args.CurrentValue;
ImmutableStack<CacheScope> previousStack = args.PreviousValue;
int threadId = Thread.CurrentThread.ManagedThreadId;
int threadIdCurrent = args.CurrentValue?.Peek().AcquiredByThread ?? -1;
int threadIdPrevious = args.PreviousValue?.Peek().AcquiredByThread ?? -1;
// Be sure in disposing of the scope
// This situation means a comeback of the previous execution context, in case if in the previous scope Current was used.
if (currentStack != null && previousStack != null
&& currentStack.Count() > previousStack.Count())
currentStack.Peek().Dispose();
}
}
И я пытаюсь выполнить следующий тест:
[TestMethod]
[TestCategory(TestCategoryCatalogs.UnitTest)]
public async Task AsyncLocalCacheManagerAccessor_request_that_processed_by_more_than_by_one_thread_is_threadsafe()
{
IContextualCacheAccessor asyncLocalAccessor = new AsynLocalContextualCacheAccessor();
Task requestAsyncFlow = Task.Run(async () =>
{
string key1 = "key1";
string value1 = "value1";
string key2 = "key2";
string value2 = "value2";
CacheScope scope1 = asyncLocalAccessor.Current;
string initialKey = "k";
object initialVal = new object();
scope1.Put(initialKey, initialVal);
scope1.TryGet(initialKey, out object result1).Should().BeTrue();
result1.Should().Be(initialVal);
var parallel1 = Task.Run(async () =>
{
await Task.Delay(5);
var cache = asyncLocalAccessor.Current;
cache.TryGet(initialKey, out object result2).Should().BeTrue();
result2.Should().Be(initialVal);
cache.Put(key1, value1);
await Task.Delay(10);
cache.Items.Count.Should().Be(1);
cache.TryGet(key1, out string result11).Should().BeTrue();
result11.Should().Be(value1);
});
var parallel2 = Task.Run(async () =>
{
await Task.Delay(2);
var cache = asyncLocalAccessor.Current;
cache.StartScope();
cache.TryGet(initialKey, out object result3).Should().BeTrue();
result3.Should().Be(initialVal);
cache.Put(key2, value2);
await Task.Delay(15);
cache.Items.Count.Should().Be(1);
cache.TryGet(key2, out string result21).Should().BeTrue();
result21.Should().Be(value2);
});
await Task.WhenAll(parallel1, parallel2);
// Here is an implicit dependency from Synchronization Context, and in most cases
// the next code will be handled by a new thread, that will cause a creation of a new scope,
// as well as for any other await inside any async operation, which is quite bad:(
asyncLocalAccessor.Current.Items.Count.Should().Be(1);
asyncLocalAccessor.Current.TryGet(initialKey, out object result4).Should().BeTrue();
result4.Should().Be(initialVal);
});
await requestAsyncFlow;
asyncLocalAccessor.Current.Items.Count.Should().Be(0);
}
И на самом деле этот тест зеленый, но есть одна (или более) проблема. Итак, что я пытаюсь достичь, это создать стек областей для каждой новой операции asyn c (если к текущей области обращались), и когда эта операция завершится, мне нужно успешно вернуться к предыдущему стеку. Я сделал это на основе текущего идентификатора потока (потому что я не нашел другого способа сделать это автоматически, но мне не нравилось мое решение), но если было выполнено продолжение операции asyn c не в начальном потоке (неявная зависимость от текущего SynchronizationContext
), а в любом другом, тогда это вызывает создание новой области видимости, что очень плохо, как для меня.
Я был бы рад, если бы кто-то может подсказать, как это сделать правильно, большое спасибо! :)
UPD 1. Код обновлен для добавления static
для каждого поля AsyncLocal
, так как значение каждого AsyncLocal
получается из ExecutionContext.GetLocalValue()
, который является stati c, поэтому не stati c AsyncLocal просто избыточное давление памяти.
UPD 2. Спасибо, @weichch за ответ, поскольку комментарий может быть большим, я просто добавил дополнительную информацию прямо к вопросу. Итак, в моем случае logi c с инкапсулированным AsyncLocal
и тем, что может сделать клиент моего кода - он вызывает только Current
на IContextualCacheAccessor
, который получит экземпляр объекта в AsyncLocal<CacheScopesManager>
, AsyncLocal
используется здесь только для того, чтобы иметь один экземпляр CacheScopesManager
на логический запрос и делиться им по этому запросу, аналогично жизненному циклу области Io C -Container, но жизненный цикл такого объекта определяется с момента создания объекта до конец асинхронного потока c, в котором был создан этот объект. Или давайте подумаем о ASP NET Core, где у нас есть IHttpContext
, IHttpContext
, кажется, не является неизменным, но все еще используется как AsyncLocal
через IHttpContextAccessor
, не так ли? Подобно этому был разработан CacheScopesManager
.
Таким образом, если клиентский код для получения текущего CacheScope
может вызывать Current
только на IContextualCacheAccessor
, то в случае AsyncLocal
реализация IContextualCacheAccessor
стека вызовов попадет в следующий код:
public CacheScope Current
{
get
{
if (_scopesStack.Value.IsEmpty)
return null;
CacheScope current = _scopesStack.Value.Peek();
if (current.IsDisposed)
{
_scopesStack.Value = _scopesStack.Value.Pop();
return Current;
}
// Create a new scope if we entered the new physical thread in the same logical thread
// in order to update async local stack and automatically have a new scope per every logically new operation
int currentThreadId = Thread.CurrentThread.ManagedThreadId;
if (currentThreadId != current.AcquiredByThread)
{
current = _NewScope();
_scopesStack.Value = _scopesStack.Value.Push(current);
}
return current;
}
}
, и если другой поток решил использовать Current
, это приведет к созданию новой области, и поскольку ImmutableStack<CacheScope>
- это AsyncLocal, мы сохраняем стек предыдущего асинхронного * 1070. * вытекает из любых изменений, что означает, что когда мы вернемся к нему, стек будет в порядке без каких-либо повреждений (наверняка, если хаки не использовались). Все это было сделано для того, чтобы сделать стек областей безопасным, а не «AsyncLocal». Итак, ваш код
async Task Method1()
{
Cache.Push(new CacheScope { Value = "Method1" });
await Task.WhenAll(Method2(), Method3());
Cache.Pop();
}
async Task Method2()
{
await Task.Delay(10);
var scope = Cache.CurrentStack.Peek();
scope.Value = "Method2";
Console.WriteLine($"Method2 - {scope.Value}");
}
async Task Method3()
{
await Task.Delay(10);
var scope = Cache.CurrentStack.Peek();
Console.WriteLine($"Method3 - {scope.Value}");
}
в случае, если мой метод доступа используется, не вызовет мутации в потоке asyn c, которая будет отражена в другом (и добавление данных в область действия предыдущего асинхронного кода). c поток, до смены нити - нормально для меня). Но есть одна проблема, на самом деле, цель этого CacheScope
состоит в том, чтобы иметь некоторое хранилище, которое перебирает логический запрос и кэширует некоторые данные, и эти данные ограничиваются CacheScope
и будут извлечены из ссылочной памяти, как только сфера закончится. И я хочу минимизировать создание таких областей, что означает, что, если код выполнялся последовательно, не должно быть никаких причин для создания новой области, даже если продолжение какой-либо асинхронной операции c произошло в другом потоке, потому что логически код все еще «последовательный», и можно использовать одну и ту же область видимости для такого «последовательного» кода. Пожалуйста, поправьте меня, если я где-то ошибаюсь.
Но ваш ответ и объяснение действительно полезны и наверняка защитят других от ошибок. Кроме того, это помогло мне понять, что Стивен имел в виду:
Если вы выполните go по этому маршруту, я рекомендую написать множество и множество модульных тестов.
мой Engli sh плохой, и я подумал, что «маршрут» означает «ссылка на статью», теперь понимаю, что это довольно «путь» в этом контексте.
UPD 3. Добавлен код CacheScope
для лучшей картинки.
public class CacheScope : IDisposableExtended
{
private ICacheScopesManager _scopeManager;
private CacheScope _parentScope;
private Dictionary<string, object> _storage = new Dictionary<string, object>();
internal CacheScope(Guid id, int boundThreadId, ICacheScopesManager scopeManager,
CacheScope parentScope)
{
_scopeManager = scopeManager.ThrowIfArgumentIsNull(nameof(scopeManager));
Id = id;
AcquiredByThread = boundThreadId;
_parentScope = parentScope;
}
public Guid Id { get; }
public int AcquiredByThread { get; }
public IReadOnlyCollection<object> Items => _storage?.Values;
public bool IsDisposed { get; private set; } = false;
public bool TryExpire<TItem>(string key, out TItem expiredItem)
{
_AssertInstanceIsDisposed();
key.ThrowIfArgumentIsNull(nameof(key));
expiredItem = default(TItem);
try
{
expiredItem = (TItem)_storage[key];
}
catch (KeyNotFoundException)
{
// Even if item is present in parent scope it cannot be expired from inner scope.
return false;
}
_storage.Remove(key);
return true;
}
public TItem GetOrPut<TItem>(string key, Func<string, TItem> putFactory)
{
_AssertInstanceIsDisposed();
key.ThrowIfArgumentIsNull(nameof(key));
putFactory.ThrowIfArgumentIsNull(nameof(putFactory));
TItem result;
try
{
result = (TItem)_storage[key];
}
catch (KeyNotFoundException)
{
if (_parentScope != null && _parentScope.TryGet(key, out result))
return result;
result = putFactory(key);
_storage.Add(key, result);
}
return result;
}
public void Put<TItem>(string key, TItem item)
{
_AssertInstanceIsDisposed();
key.ThrowIfArgumentIsNull(nameof(key));
_storage[key] = item;
// We are not even thinking about to change the parent scope here,
// because parent scope should be considered by current as immutable.
}
public bool TryGet<TItem>(string key, out TItem item)
{
_AssertInstanceIsDisposed();
key.ThrowIfArgumentIsNull(nameof(key));
item = default(TItem);
try
{
item = (TItem)_storage[key];
}
catch (KeyNotFoundException)
{
return _parentScope != null && _parentScope.TryGet(key, out item);
}
return true;
}
public void Dispose()
{
if (IsDisposed)
return;
Dictionary<string, object> localStorage = Interlocked.Exchange(ref _storage, null);
if (localStorage == null)
{
// that should never happen but Dispose in general is expected to be safe to call so... let's obey the rules
return;
}
foreach (var item in localStorage.Values)
if (item is IDisposable disposable)
disposable.Dispose();
_parentScope = null;
_scopeManager = null;
IsDisposed = true;
}
public CacheScope StartScope() => _scopeManager.CreateScope(this);
}