. NET Базовое API, похоже, зашло в тупик - PullRequest
1 голос
/ 23 января 2020

У нас есть. NET Базовый API, работающий в производстве, который может работать стабильно в течение нескольких дней или даже недель, а затем внезапно зависает. Такое замораживание может происходить даже несколько раз в день, совершенно случайно. Что происходит: код кажется замороженным и не принимает новые запросы. Новые запросы не регистрируются, число потоков возрастает до небес, а объем памяти постоянно увеличивается до тех пор, пока не будет исчерпан.

Я создал дамп памяти для анализа. Это говорит мне о том, что большинство потоков ожидает снятия блокировки при определенной функции c, похожей на тупик. Я проанализировал эту функцию и не понимаю, почему это может вызвать проблемы. Кто-нибудь может мне помочь? Очевидно, я подозреваю, что AsParallel () является потокобезопасным, но inte rnet говорит нет, это потокобезопасный.

public async Task<bool> TryStorePropertiesAsync(string sessionId, Dictionary<string, string> keyValuePairs, int ttl = 1500)
{
    try
    {
        await Task.WhenAll(keyValuePairs.AsParallel().Select(async item => 
        {
            var doc = await _cosmosDbRepository.GetItemByKeyAsync(GetId(sessionId, item.Key), sessionId) ?? new Document();

            doc.SetPropertyValue("_partitionKey", sessionId);
            doc.SetPropertyValue("key", GetId(sessionId, item.Key));
            doc.SetPropertyValue("name", item.Key.ToLowerInvariant());
            doc.SetPropertyValue("value", item.Value);
            doc.TimeToLive = ttl;
            await _cosmosDbRepository.UpsertDocumentAsync(doc, "_partitionKey");
        }));
        return true;
    }
    catch
    {
        ApplicationInsightsLogger.TrackException(ex, new Dictionary<string, string>
        {
            { "sessionID", sessionId },
            { "action", "TryStoreItems" }
        });
        return false;
    }
}

1 Ответ

1 голос
/ 23 января 2020

Код имеет серьезные проблемы. Например, для 100 элементов запускается 100 одновременных операций, 4/8 одновременно. Код внутри l oop, кажется, читает документ из CosmosDB, устанавливает все его свойства, затем вызывает метод с именем, аналогичным DocumentClient.UpsertDocumentAsyn c, который не нужен Предварительная загрузка ничего. Не зная, что такое _cosmosDbRepository и что делают его методы, можно только догадываться. Возможно, это создает дополнительные конфликты, хотя, пытаясь заблокировать что-либо во время (возможно, бесполезного) цикла загрузки / обновления.

Для начала AsParallel() предназначено только для параллелизма данных: разделить некоторые данные в памяти и используйте столько рабочих, сколько есть ядер для каждого раздела. Здесь нет данных, только вызовы asyn c операций. Вот почему для 100 элементов этот код будет запускать 100 одновременных задач.

Это может привести к достижению любого количества ограничений регулирования CosmosDB, даже если это не вызывает конфликтов параллелизма. Это также может привести к проблемам network , поскольку один и тот же кабель используется для всех этих одновременных подключений.

Не принимая во внимание CosmosDB, правильный способ сделать множество вызовов удаленной службе - поставить их в очередь и выполнить с ограниченным числом работников. Это очень легко сделать с помощью NET s ActionBlock . Код может измениться примерно так:

class Payload
{
    public string SessionKey{get;set;}
    public string Key{get;set;}
    public string Name{get;set;}
    public string Value{get;set;}
    public int TTL{get;set;}
}

//Allow only 10 concurrent upserts
var options=new ExecutionDataflowBlockOptions
  {
     MaxDegreeOfParallelism = 10
  };
var upsertBlock=new ActionBlock<Payload>(myPosterAsync,options);

foreach(var payload in payloads)
{
    block.Post(pair);
}
//Tell the block we're done
block.Complete();
//Await for all queued operations to complete
await block.Completion;

Где myPosterAsync содержит код сообщения:

async Task myPosterAsync(Payload item)
{
    try
    {
        var doc = await _cosmosDbRepository.GetItemByKeyAsync(GetId(item.SessionId, item.Key), 
                                                          item.SessionId) 
                         ?? new Document();

        doc.SetPropertyValue("_partitionKey", item.SessionId);
        doc.SetPropertyValue("key", GetId(sessionId, item.Key));
        doc.SetPropertyValue("name", item.Name);
        doc.SetPropertyValue("value", item.Value);
        doc.TimeToLive = item.TTL;
        await _cosmosDbRepository.UpsertDocumentAsync(doc, "_partitionKey");
    catch (Exception ex)   
    {
        //Handle the error in some way, eg log it
        ApplicationInsightsLogger.TrackException(ex, new Dictionary<string, string>
        {
            { "sessionID", item.SessionId },
            { "action", "TryStoreItems" }
        });
    }
}
...