Как рассчитать пропускную способность, необходимую для задачи массового импорта в CosmosDB? - PullRequest
0 голосов
/ 26 апреля 2020

Обновление

Я думаю, что моя проблема может быть связана с клиентом. Закрыв Visual Studio 2019 и снова запустив его, я могу получить полностью загруженный файл (все 2 миллиона записей). Однако, если я попытаюсь запустить снова, произойдет сбой с проблемой, как описано ниже.

Azure DocumentDB время от времени выдает исключение SocketException / GoneException

Оригинальный выпуск

I'm Попытка массовой загрузки некоторых файлов данных в БД космос. Я использую пример, который нашел здесь:

https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started/tree/master/BulkImportSample

В примере импорта они создают поддельные документы с al oop. В моем случае я читаю файлы CSV из каталога. Каждая строка файла CSV преобразуется в один документ CosmosDB.

Пропускная способность по умолчанию была 400, но я увеличил ее до 10000. После импорта мне она не нужна, но я могу установить для нее все, что требуется для задачи массового импорта. Несмотря на это, у меня возникла какая-то проблема с пропускной способностью или ограничением, и я не совсем понимаю, как определить, какая пропускная способность мне нужна для импорта данных. Каждый из этих файлов CSV содержит около 2 миллионов строк, но каждая строка содержит только 10 скалярных значений.

начинает работать. Я получаю некоторый вывод из BulkExecutorTrace, например, «BulkExecutorTrace Information: 0: Index of Partition 0: 2452 | Работает на 2452 документах за 1 секунду при 19342.86 RU / s с 4 задачами. Столкнулся с 0 дросселями»

Но затем после 19 строки вывода, такие как, я получаю:

DocDBTrace Предупреждение: 0: Брошено исключение: 'System.Threading.Tasks.TaskCanceledException' в mscorlib.dll BulkExecutorTrace Информация: 0: RNTBD тайм-аут вызова на канале 192.168.11.105:19676 -> 40.78.226.8:14319. Ошибка: Индекс раздела ReceiveTimeout 0: 22068 | Работает на 1226 документах за 1 секунду при 9652,88 RU / с с 20 задачами. Столкнулся с 0 дросселями Исключение: «Microsoft. Azure .Documents.TransportException» в Microsoft. Azure .Documents.Client.dll Исключение: «Microsoft. Azure .Documents.TransportException» в mscorlib.dll Исключение: «Microsoft. Azure .Documents.TransportException» в mscorlib.dll Информация DocDBTrace: 0: RequestAsyn c не удалось: RID: dbs / Diseases / colls / Diseases / sprocs / __. Sys.commonBulkInsert, тип ресурса: StoredProcedure, Op : (operationType: Execute JavaScript, resourceType: StoredProcedure), Адрес: rntbd: //cdb-ms-prod-eastus1-fd10.documents.azure.com: 14319 / apps / 00e9d5e0-018e-43a2-b5a4- f41c78498cdb / services / 61a05d2a-fb30-455f-864e-c9e10e85684c / partitions / 92daa841-dcc5-40f0-9c21-2956cda4d2ac / replicas / 132323601201339145p /, исключение: возникла ошибка клиента Microsoft. Azure .Export:. время ожидания запроса истекло при ожидании ответа сервера. (Время: 2020-04-26T18: 18: 32.9586759Z, идентификатор операции: 80910ba7-8b36-40fa-a3bf-3eac239b00e2, код ошибки: ReceiveTimeout [0x0010], базовая ошибка: HRESULT 0x80131500, URI: rntbd: // cdb-ms -prod-eastus1-fd10.documents azure .com:. 14319 / приложения / 00e9d5e0-018e-43a2-b5a4-f41c78498cdb / услуги / 61a05d2a-fb30-455f-864e-c9e10e85684c / перегородки / 92daa841-dcc5-40f0-9c21 -2956cda4d2ac / replicas / 132323601201339145p /, соединение: 192.168.11.105:19676 -> 40.78.226.8:14319, полезная нагрузка отправлена: True, история процессора: (2020-04-26T18: 18: 12.7679827Z 80.069), (2020-04- 26T18: 18: 22.7667638Z 28.038), (2020-04-26T18: 18: 22.7672671Z 100.000), (2020-04-26T18: 18: 22.7672671Z 0.000), (2020-04-26T18: 18: 22.7672671Z 0.000) , (2020-04-26T18: 18: 32.7701961Z 20.629), количество процессоров: 8) в Microsoft. Azure .Documents.Rntbd.Channel.d__13.MoveNext () --- Конец трассировки стека из предыдущего расположения, где исключение был брошен --- в System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (Задача) в System.Runtime.CompilerServices.TaskAwaiter.HandleNonSucce ssAndDebuggerNotification (Задача) в Microsoft. Azure .Documents.Rntbd.LoadBalancingPartition.d__9.MoveNext ()

private async Task RunBulkImportAsync()
    {
        DocumentCollection dataCollection = null;

        try
        {
            dataCollection = GetCollectionIfExists(client, DatabaseName, CollectionName);
            if (dataCollection == null)
            {
                throw new Exception("The data collection does not exist");
            }
        }
        catch (Exception de)
        {
            Trace.TraceError("Unable to initialize, exception message: {0}", de.Message);
            throw;
        }

        string partitionKeyProperty = dataCollection.PartitionKey.Paths[0].Replace("/", "");

        // Set retry options high for initialization (default values).
        client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
        client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;

        IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
        await bulkExecutor.InitializeAsync();

        // Set retries to 0 to pass control to bulk executor.
        client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
        client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;

        BulkImportResponse bulkImportResponse = null;
        long totalNumberOfDocumentsInserted = 0;
        double totalRequestUnitsConsumed = 0;
        double totalTimeTakenSec = 0;

        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;

        foreach (string d in Directory.GetDirectories(RootPath).Take(1))
        {

            foreach (string f in Directory.GetFiles(d).Take(1))
            {

                Trace.WriteLine("Processing file, " + f);

                var lines = File.ReadAllLines(f);

                Trace.WriteLine("File has " + lines.Count() + "lines");

                List<RowToImport> dataToImport = lines
                                       .Skip(1)
                                       .Select(v => RowToImport.FromCsv(v))
                                       .ToList();

                List<string> documentsToImportInBatch = dataToImport.Select(dti => GenerateJsonDocument(Guid.NewGuid().ToString(), dti.Disease, dti.Year, dti.Age, dti.Country, dti.CountryName, dti.CohortSize, dti.DeathsCongenital)).ToList();

                // Invoke bulk import API.

                var tasks = new List<Task>();

                tasks.Add(Task.Run(async () =>
                {
                    Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", f));

                    do
                    {
                        try
                        {
                            bulkImportResponse = await bulkExecutor.BulkImportAsync(
                                documents: documentsToImportInBatch,
                                enableUpsert: true,
                                disableAutomaticIdGeneration: true,
                                maxConcurrencyPerPartitionKeyRange: 100,
                                maxInMemorySortingBatchSize: null,
                                cancellationToken: token);
                        }
                        catch (DocumentClientException de)
                        {
                            Trace.TraceError("Document client exception: {0}", de);
                            //Console.WriteLine("Document client exception: {0} {1}", f, de);
                            break;
                        }
                        catch (Exception e)
                        {
                            Trace.TraceError("Exception: {0}", e);
                            //Console.WriteLine("Exception: {0} {1}", f, e);
                            break;
                        }
                    } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count) ;


                    Trace.WriteLine(String.Format("\nSummary for batch {0}:", f));
                    Trace.WriteLine("--------------------------------------------------------------------- ");
                    Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
                        bulkImportResponse.NumberOfDocumentsImported,
                        Math.Round(bulkImportResponse.NumberOfDocumentsImported / bulkImportResponse.TotalTimeTaken.TotalSeconds),
                        Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.TotalTimeTaken.TotalSeconds),
                        bulkImportResponse.TotalTimeTaken.TotalSeconds));
                    Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
                        (bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.NumberOfDocumentsImported)));
                    Trace.WriteLine("---------------------------------------------------------------------\n ");

                    totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
                    totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
                    totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
                },
                token));

                await Task.WhenAll(tasks);

            }

            Trace.WriteLine("Overall summary:");
            Trace.WriteLine("--------------------------------------------------------------------- ");
            Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
                totalNumberOfDocumentsInserted,
                Math.Round(totalNumberOfDocumentsInserted / totalTimeTakenSec),
                Math.Round(totalRequestUnitsConsumed / totalTimeTakenSec),
                totalTimeTakenSec));
            Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
                (totalRequestUnitsConsumed / totalNumberOfDocumentsInserted)));
            Trace.WriteLine("--------------------------------------------------------------------- ");


            Trace.WriteLine("\nPress any key to exit.");
            Console.ReadKey();
        }

    }

В циклах каталогов и файлов я сейчас использую Take (1), чтобы попытаться заставить работать один файл. Но на самом деле в каждой есть 4 директории и почти сотня файлов.

Любой совет, на который вы можете мне указать, о том, как мне нужно регулировать эту вещь, чтобы заставить ее импортировать все эти данные?

1 Ответ

0 голосов
/ 27 апреля 2020

Это исключение не относится к пропускной способности. Это исключение указывает на проблему тайм-аута / соединения, о которой говорится на странице устранения неполадок SDK 1002 *

В вашем исключении мы видим, что имеется всплеск ЦП:

CPU history: 
(2020-04-26T18:18:12.7679827Z 80.069), 
(2020-04-26T18:18:22.7667638Z 28.038), 
(2020-04-26T18:18:22.7672671Z 100.000), 
(2020-04-26T18:18:22.7672671Z 0.000), 
(2020-04-26T18:18:22.7672671Z 0.000), 
(2020-04-26T18:18:32.7701961Z 20.629)

Это может привести к проблемам с подключением. Если вы выполняете это на локальном компьютере разработчика, посмотрите, какие другие процессы могут потреблять процессор. Если это выполняется на ВМ, может потребоваться больший пул ЦП.

Кроме того, на основе вашего кода вы используете Bulk Executor для одновременных операций (вы создаете несколько задач параллельно).

Советы по производительности Bulk Executor указывают что это не должно быть сделано:

Так как выполнение API единой массовой операции потребляет большую часть ЦП клиентского компьютера и сетевой ввод-вывод (это происходит при внутреннем создании нескольких задач). Избегайте порождения нескольких параллельных задач в процессе приложения, которые выполняют массовые API-вызовы операций. Если один вызов API массовых операций, выполняемый на одной виртуальной машине, не может использовать пропускную способность всего контейнера (если пропускная способность вашего контейнера> 1 млн. RU / с), предпочтительно создать отдельные виртуальные машины для одновременного выполнения API массовых операций. звонки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...