Код имеет серьезные проблемы. Например, для 100 элементов запускается 100 одновременных операций, 4/8 одновременно. Код внутри l oop, кажется, читает документ из CosmosDB, устанавливает все его свойства, затем вызывает метод с именем, аналогичным DocumentClient.UpsertDocumentAsyn c, который не нужен Предварительная загрузка ничего. Не зная, что такое _cosmosDbRepository
и что делают его методы, можно только догадываться. Возможно, это создает дополнительные конфликты, хотя, пытаясь заблокировать что-либо во время (возможно, бесполезного) цикла загрузки / обновления.
Для начала AsParallel()
предназначено только для параллелизма данных: разделить некоторые данные в памяти и используйте столько рабочих, сколько есть ядер для каждого раздела. Здесь нет данных, только вызовы asyn c операций. Вот почему для 100 элементов этот код будет запускать 100 одновременных задач.
Это может привести к достижению любого количества ограничений регулирования CosmosDB, даже если это не вызывает конфликтов параллелизма. Это также может привести к проблемам network , поскольку один и тот же кабель используется для всех этих одновременных подключений.
Не принимая во внимание CosmosDB, правильный способ сделать множество вызовов удаленной службе - поставить их в очередь и выполнить с ограниченным числом работников. Это очень легко сделать с помощью NET s ActionBlock . Код может измениться примерно так:
class Payload
{
public string SessionKey{get;set;}
public string Key{get;set;}
public string Name{get;set;}
public string Value{get;set;}
public int TTL{get;set;}
}
//Allow only 10 concurrent upserts
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
var upsertBlock=new ActionBlock<Payload>(myPosterAsync,options);
foreach(var payload in payloads)
{
block.Post(pair);
}
//Tell the block we're done
block.Complete();
//Await for all queued operations to complete
await block.Completion;
Где myPosterAsync
содержит код сообщения:
async Task myPosterAsync(Payload item)
{
try
{
var doc = await _cosmosDbRepository.GetItemByKeyAsync(GetId(item.SessionId, item.Key),
item.SessionId)
?? new Document();
doc.SetPropertyValue("_partitionKey", item.SessionId);
doc.SetPropertyValue("key", GetId(sessionId, item.Key));
doc.SetPropertyValue("name", item.Name);
doc.SetPropertyValue("value", item.Value);
doc.TimeToLive = item.TTL;
await _cosmosDbRepository.UpsertDocumentAsync(doc, "_partitionKey");
catch (Exception ex)
{
//Handle the error in some way, eg log it
ApplicationInsightsLogger.TrackException(ex, new Dictionary<string, string>
{
{ "sessionID", item.SessionId },
{ "action", "TryStoreItems" }
});
}
}