Обновление
Я думаю, что моя проблема может быть связана с клиентом. Закрыв Visual Studio 2019 и снова запустив его, я могу получить полностью загруженный файл (все 2 миллиона записей). Однако, если я попытаюсь запустить снова, произойдет сбой с проблемой, как описано ниже.
Azure DocumentDB время от времени выдает исключение SocketException / GoneException
Оригинальный выпуск
I'm Попытка массовой загрузки некоторых файлов данных в БД космос. Я использую пример, который нашел здесь:
https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started/tree/master/BulkImportSample
В примере импорта они создают поддельные документы с al oop. В моем случае я читаю файлы CSV из каталога. Каждая строка файла CSV преобразуется в один документ CosmosDB.
Пропускная способность по умолчанию была 400, но я увеличил ее до 10000. После импорта мне она не нужна, но я могу установить для нее все, что требуется для задачи массового импорта. Несмотря на это, у меня возникла какая-то проблема с пропускной способностью или ограничением, и я не совсем понимаю, как определить, какая пропускная способность мне нужна для импорта данных. Каждый из этих файлов CSV содержит около 2 миллионов строк, но каждая строка содержит только 10 скалярных значений.
начинает работать. Я получаю некоторый вывод из BulkExecutorTrace, например, «BulkExecutorTrace Information: 0: Index of Partition 0: 2452 | Работает на 2452 документах за 1 секунду при 19342.86 RU / s с 4 задачами. Столкнулся с 0 дросселями»
Но затем после 19 строки вывода, такие как, я получаю:
DocDBTrace Предупреждение: 0: Брошено исключение: 'System.Threading.Tasks.TaskCanceledException' в mscorlib.dll BulkExecutorTrace Информация: 0: RNTBD тайм-аут вызова на канале 192.168.11.105:19676 -> 40.78.226.8:14319. Ошибка: Индекс раздела ReceiveTimeout 0: 22068 | Работает на 1226 документах за 1 секунду при 9652,88 RU / с с 20 задачами. Столкнулся с 0 дросселями Исключение: «Microsoft. Azure .Documents.TransportException» в Microsoft. Azure .Documents.Client.dll Исключение: «Microsoft. Azure .Documents.TransportException» в mscorlib.dll Исключение: «Microsoft. Azure .Documents.TransportException» в mscorlib.dll Информация DocDBTrace: 0: RequestAsyn c не удалось: RID: dbs / Diseases / colls / Diseases / sprocs / __. Sys.commonBulkInsert, тип ресурса: StoredProcedure, Op : (operationType: Execute JavaScript, resourceType: StoredProcedure), Адрес: rntbd: //cdb-ms-prod-eastus1-fd10.documents.azure.com: 14319 / apps / 00e9d5e0-018e-43a2-b5a4- f41c78498cdb / services / 61a05d2a-fb30-455f-864e-c9e10e85684c / partitions / 92daa841-dcc5-40f0-9c21-2956cda4d2ac / replicas / 132323601201339145p /, исключение: возникла ошибка клиента Microsoft. Azure .Export:. время ожидания запроса истекло при ожидании ответа сервера. (Время: 2020-04-26T18: 18: 32.9586759Z, идентификатор операции: 80910ba7-8b36-40fa-a3bf-3eac239b00e2, код ошибки: ReceiveTimeout [0x0010], базовая ошибка: HRESULT 0x80131500, URI: rntbd: // cdb-ms -prod-eastus1-fd10.documents azure .com:. 14319 / приложения / 00e9d5e0-018e-43a2-b5a4-f41c78498cdb / услуги / 61a05d2a-fb30-455f-864e-c9e10e85684c / перегородки / 92daa841-dcc5-40f0-9c21 -2956cda4d2ac / replicas / 132323601201339145p /, соединение: 192.168.11.105:19676 -> 40.78.226.8:14319, полезная нагрузка отправлена: True, история процессора: (2020-04-26T18: 18: 12.7679827Z 80.069), (2020-04- 26T18: 18: 22.7667638Z 28.038), (2020-04-26T18: 18: 22.7672671Z 100.000), (2020-04-26T18: 18: 22.7672671Z 0.000), (2020-04-26T18: 18: 22.7672671Z 0.000) , (2020-04-26T18: 18: 32.7701961Z 20.629), количество процессоров: 8) в Microsoft. Azure .Documents.Rntbd.Channel.d__13.MoveNext () --- Конец трассировки стека из предыдущего расположения, где исключение был брошен --- в System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (Задача) в System.Runtime.CompilerServices.TaskAwaiter.HandleNonSucce ssAndDebuggerNotification (Задача) в Microsoft. Azure .Documents.Rntbd.LoadBalancingPartition.d__9.MoveNext ()
private async Task RunBulkImportAsync()
{
DocumentCollection dataCollection = null;
try
{
dataCollection = GetCollectionIfExists(client, DatabaseName, CollectionName);
if (dataCollection == null)
{
throw new Exception("The data collection does not exist");
}
}
catch (Exception de)
{
Trace.TraceError("Unable to initialize, exception message: {0}", de.Message);
throw;
}
string partitionKeyProperty = dataCollection.PartitionKey.Paths[0].Replace("/", "");
// Set retry options high for initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
BulkImportResponse bulkImportResponse = null;
long totalNumberOfDocumentsInserted = 0;
double totalRequestUnitsConsumed = 0;
double totalTimeTakenSec = 0;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
foreach (string d in Directory.GetDirectories(RootPath).Take(1))
{
foreach (string f in Directory.GetFiles(d).Take(1))
{
Trace.WriteLine("Processing file, " + f);
var lines = File.ReadAllLines(f);
Trace.WriteLine("File has " + lines.Count() + "lines");
List<RowToImport> dataToImport = lines
.Skip(1)
.Select(v => RowToImport.FromCsv(v))
.ToList();
List<string> documentsToImportInBatch = dataToImport.Select(dti => GenerateJsonDocument(Guid.NewGuid().ToString(), dti.Disease, dti.Year, dti.Age, dti.Country, dti.CountryName, dti.CohortSize, dti.DeathsCongenital)).ToList();
// Invoke bulk import API.
var tasks = new List<Task>();
tasks.Add(Task.Run(async () =>
{
Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", f));
do
{
try
{
bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: 100,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
}
catch (DocumentClientException de)
{
Trace.TraceError("Document client exception: {0}", de);
//Console.WriteLine("Document client exception: {0} {1}", f, de);
break;
}
catch (Exception e)
{
Trace.TraceError("Exception: {0}", e);
//Console.WriteLine("Exception: {0} {1}", f, e);
break;
}
} while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count) ;
Trace.WriteLine(String.Format("\nSummary for batch {0}:", f));
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
bulkImportResponse.NumberOfDocumentsImported,
Math.Round(bulkImportResponse.NumberOfDocumentsImported / bulkImportResponse.TotalTimeTaken.TotalSeconds),
Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.TotalTimeTaken.TotalSeconds),
bulkImportResponse.TotalTimeTaken.TotalSeconds));
Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.NumberOfDocumentsImported)));
Trace.WriteLine("---------------------------------------------------------------------\n ");
totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
},
token));
await Task.WhenAll(tasks);
}
Trace.WriteLine("Overall summary:");
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
totalNumberOfDocumentsInserted,
Math.Round(totalNumberOfDocumentsInserted / totalTimeTakenSec),
Math.Round(totalRequestUnitsConsumed / totalTimeTakenSec),
totalTimeTakenSec));
Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
(totalRequestUnitsConsumed / totalNumberOfDocumentsInserted)));
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine("\nPress any key to exit.");
Console.ReadKey();
}
}
В циклах каталогов и файлов я сейчас использую Take (1), чтобы попытаться заставить работать один файл. Но на самом деле в каждой есть 4 директории и почти сотня файлов.
Любой совет, на который вы можете мне указать, о том, как мне нужно регулировать эту вещь, чтобы заставить ее импортировать все эти данные?