ConcurrentDictionary - PullRequest
       0

ConcurrentDictionary

1 голос
/ 22 января 2012

Введение

Я создаю оболочку API для SE API 2.0
В настоящее время я реализую функцию кэширования, до сих пор это не было проблемой.Теперь я принимаю во внимание параллелизм.Это мой метод тестирования:

Код

public static void TestConcurrency()
{
    Stopwatch sw = new Stopwatch();
    sw.Start();
    IList<Task> tasks = new List<Task>();
    for (int i = 0; i < 1000; i++)
    {
        tasks.Add(Task.Factory.StartNew(p => client.GetAnswers(), null));
    }
    Task.WaitAll(tasks.ToArray());
    sw.Stop();
    Console.WriteLine("elapsed: {0}", sw.Elapsed.ToString());
    Console.ReadKey();
}

Описание

Внутренне, у клиента есть класс RequestHandler, который пытается извлечь значение из кэшаи, если это не удается сделать, он выполняет фактический запрос.

Код

/// <summary>
/// Checks the cache and then performs the actual request, if required.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result against which to deserialize JSON.</typeparam>
/// <param name="endpoint">The API endpoint to query.</param>
/// <returns>The API response object.</returns>
private IApiResponse<T> InternalProcessing<T>(string endpoint) where T : class
{
    IApiResponse<T> result = FetchFromCache<T>(endpoint);
    return result ?? PerformRequest<T>(endpoint);
}

Описание

Код, который фактически выполняет запрос, не имеет отношения к этой проблеме,Код, который пытается получить доступ к кешу, выполняет следующие действия:

Код

/// <summary>
/// Attempts to fetch the response object from the cache instead of directly from the API.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result against which to deserialize JSON.</typeparam>
/// <param name="endpoint">The API endpoint to query.</param>
/// <returns>The API response object.</returns>
private IApiResponse<T> FetchFromCache<T>(string endpoint) where T : class
{
    IApiResponseCacheItem<T> cacheItem = Store.Get<T>(endpoint);
    if (cacheItem != null)
    {
        IApiResponse<T> result = cacheItem.Response;
        result.Source = ResultSourceEnum.Cache;
        return result;
    }
    return null;
}

Описание

Фактическая реализация хранилища кеша работает на ConcurrentDictionary, когдаметод Get<T>() вызывается, я:

  • Проверьте, есть ли в словаре запись для endpoint.
  • Если он есть, я проверяю, содержит ли он ответobject.
  • Если у него еще нет объекта ответа, состояние элемента кэша будет Processing, и поток будет переведен в спящий режим на некоторое время, ожидая фактического запроса.завершено.
  • Один раз или если ответ «передается» в хранилище кэша (это происходит после завершения запроса), возвращается элемент кэша.
  • Если элемент кэша был слишком старили по истечении времени обработки запроса запись удаляется из хранилища.
  • Если в кеше нет записи для endpoint, null помещается в кеш в качестве ответа для endpointсигнализация запроса на этой конечной точке bПроцесс обработан, и нет необходимости выдавать больше запросов на одну и ту же конечную точку.Затем возвращается null, сигнализирующий о фактическом запросе.

Код

/// <summary>
/// Attempts to access the internal cache and retrieve a response cache item without querying the API.
/// <para>If the endpoint is not present in the cache yet, null is returned, but the endpoint is added to the cache.</para>
/// <para>If the endpoint is present, it means the request is being processed. In this case we will wait on the processing to end before returning a result.</para>
/// </summary>
/// <typeparam name="T">The strong type of the expected API result.</typeparam>
/// <param name="endpoint">The API endpoint</param>
/// <returns>Returns an API response cache item if successful, null otherwise.</returns>
public IApiResponseCacheItem<T> Get<T>(string endpoint) where T : class
{
    IApiResponseCacheItem cacheItem;
    if (Cache.TryGetValue(endpoint, out cacheItem))
    {
        while (cacheItem.IsFresh && cacheItem.State == CacheItemStateEnum.Processing)
        {
            Thread.Sleep(10);
        }
        if (cacheItem.IsFresh && cacheItem.State == CacheItemStateEnum.Cached)
        {
            return (IApiResponseCacheItem<T>)cacheItem;
        }
        IApiResponseCacheItem value;
        Cache.TryRemove(endpoint, out value);
    }
    Push<T>(endpoint, null);
    return null;
}

Проблема заключается в неопределенности, иногда два запроса делают это, вместо одногокак будто это должно произойти.

Я думаю, что где-то по пути к чему-то, что не является потокобезопасным, осуществляется доступ.Но я не могу определить, что это может быть.Что бы это могло быть, или как мне отладить это правильно?

Обновление

Проблема заключалась в том, что я не всегда был поточно-ориентированным на ConcurrentDictionary

Этот метод не перенастраивал boolean, указывающий, был ли кэш успешно обновлен, поэтому, если этот метод не удался, null был бы возвращен дважды Get<T>().

Код

/// <summary>
/// Attempts to push API responses into the cache store.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result.</typeparam>
/// <param name="endpoint">The queried API endpoint.</param>
/// <param name="response">The API response.</param>
/// <returns>True if the operation was successful, false otherwise.</returns>
public bool Push<T>(string endpoint, IApiResponse<T> response) where T : class
{
    if (endpoint.NullOrEmpty())
    {
        return false;
    }
    IApiResponseCacheItem item;
    if (Cache.TryGetValue(endpoint, out item))
    {
        ((IApiResponseCacheItem<T>)item).UpdateResponse(response);
        return true;
    }
    else
    {
        item = new ApiResponseCacheItem<T>(response);
        return Cache.TryAdd(endpoint, item);
    }
}

Описание

Решением было реализовать возвращаемое значение и изменить Get<T>(), добавив следующее:

Код

if (Push<T>(endpoint, null) || retries > 1) // max retries for sanity.
{
    return null;
}
else
{
    return Get<T>(endpoint, ++retries); // retry push.
}

1 Ответ

1 голос
/ 22 января 2012
IApiResponseCacheItem<T> cacheItem = Store.Get<T>(endpoint);
if (cacheItem != null)
{
   // etc..
}

ConcurrentDirectionary является поточно-ориентированным, но это не делает ваш код безопасным.Приведенный выше фрагмент является ядром проблемы.Два потока могут одновременно вызывать метод Get () и получать нулевое значение.Они оба продолжат и вызовут PerformRequest () одновременно.Вам необходимо объединить InternalProcessing () и FetchFromCache () и убедиться, что только один поток может вызвать PerformRequest с помощью блокировки.Это может привести к плохому параллелизму, возможно, вы просто отбросите дублирующийся ответ.По всей вероятности, запросы в любом случае сериализуются сервером SE, так что это, вероятно, не имеет значения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...