Космические вставки не будут эффективно распараллеливаться - PullRequest
1 голос
/ 12 октября 2019

Мета-вопрос:
Мы извлекаем данные из EventHub, запускаем некоторую логику и сохраняем их в космосе. В настоящее время вставки Cosmos являются нашим узким местом. Как нам максимизировать нашу пропускную способность?

Подробности
Мы пытаемся оптимизировать пропускную способность нашего Космоса, и в SDK, похоже, есть некоторое противоречие, которое делаетпараллельные вставки только незначительно быстрее последовательных вставок.
Мы логически делаем:

            for (int i = 0; i < insertCount; i++)
            {
                taskList.Add(InsertCosmos(sdkContainerClient));
            }
            var parallelTimes = await Task.WhenAll(taskList);

Вот результаты, сравнивающие последовательные вставки, параллельные вставки и «подделку» вставки (с помощью Task.Delay):

Serial took: 461ms for 20
 - Individual times 28,8,117,19,14,11,10,12,5,8,9,11,18,15,79,23,14,16,14,13

Cosmos Parallel
Parallel took: 231ms for 20
 - Individual times 17,15,23,39,45,52,72,74,80,91,96,98,108,117,123,128,139,146,147,145

Just Parallel (no cosmos)
Parallel took: 27ms for 20
 - Individual times 27,26,26,26,26,26,26,25,25,25,25,25,25,24,24,24,23,23,23,23
  • Последовательность очевидна (просто добавьте каждое значение)
  • нет космоса (последнее время) также очевидна (достаточно минимального времени)
  • Но параллельный космос не распараллеливается почти так же хорошо , что указывает на некоторую конкуренцию.

Мы запускаем это на виртуальной машине в Azure (в том же центре обработки данных, что и в Космосе), достаточноRU не получают 429, а используют Microsoft.Azure.Cosmos 3.2.0.

Полный пример кода

    class Program
    {
        public static void Main(string[] args)
        {
            CosmosWriteTest().Wait();
        }

        public static async Task CosmosWriteTest()
        {
            var cosmosClient = new CosmosClient("todo", new CosmosClientOptions { ConnectionMode = ConnectionMode.Direct });
            var database = cosmosClient.GetDatabase("<ourcontainer>");
            var sdkContainerClient = database.GetContainer("<ourcontainer>");
            int insertCount = 25;
            //Warmup
            await sdkContainerClient.CreateItemAsync(new TestObject());

            //---Serially inserts into Cosmos---
            List<long> serialTimes = new List<long>();
            var serialTimer = Stopwatch.StartNew();
            Console.WriteLine("Cosmos Serial");
            for (int i = 0; i < insertCount; i++)
            {
                serialTimes.Add(await InsertCosmos(sdkContainerClient));
            }
            serialTimer.Stop();
            Console.WriteLine($"Serial took: {serialTimer.ElapsedMilliseconds}ms for {insertCount}");
            Console.WriteLine($" - Individual times {string.Join(",", serialTimes)}");

            //---Parallel inserts into Cosmos---
            Console.WriteLine(Environment.NewLine + "Cosmos Parallel");
            var parallelTimer = Stopwatch.StartNew();
            var taskList = new List<Task<long>>();
            for (int i = 0; i < insertCount; i++)
            {
                taskList.Add(InsertCosmos(sdkContainerClient));
            }
            var parallelTimes = await Task.WhenAll(taskList);

            parallelTimer.Stop();
            Console.WriteLine($"Parallel took: {parallelTimer.ElapsedMilliseconds}ms for {insertCount}");
            Console.WriteLine($" - Individual times {string.Join(",", parallelTimes)}");

            //---Testing parallelism minus cosmos---
            Console.WriteLine(Environment.NewLine + "Just Parallel (no cosmos)");
            var justParallelTimer = Stopwatch.StartNew();
            var noCosmosTaskList = new List<Task<long>>();
            for (int i = 0; i < insertCount; i++)
            {
                noCosmosTaskList.Add(InsertCosmos(sdkContainerClient, true));
            }
            var justParallelTimes = await Task.WhenAll(noCosmosTaskList);

            justParallelTimer.Stop();
            Console.WriteLine($"Parallel took: {justParallelTimer.ElapsedMilliseconds}ms for {insertCount}");
            Console.WriteLine($" - Individual times {string.Join(",", justParallelTimes)}");
        }

        //inserts 
        private static async Task<long> InsertCosmos(Container sdkContainerClient, bool justDelay = false)
        {
            var timer = Stopwatch.StartNew();
            if (!justDelay)
                await sdkContainerClient.CreateItemAsync(new TestObject());
            else
                await Task.Delay(20);

            timer.Stop();
            return timer.ElapsedMilliseconds;
        }

        //Test object to save to Cosmos
        public class TestObject
        {
            public string id { get; set; } = Guid.NewGuid().ToString();
            public string pKey { get; set; } = Guid.NewGuid().ToString();
            public string Field1 { get; set; } = "Testing this field";
            public double Number { get; set; } = 12345;
        }
    }

1 Ответ

1 голос
/ 12 октября 2019

Это сценарий, для которого вводится Bulk. Массовый режим в настоящее время находится в режиме предварительного просмотра и доступен в пакете 3.2.0-preview2 .

Чтобы воспользоваться преимуществами функции Массовая передача, вам необходимо включить флаг AllowBulkExecution:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true } );

Этот режим был создан для использования описанного вами сценария, списка одновременных операций, для которых требуется пропускная способность.

Здесь у нас есть пример проекта: https://github.com/Azure/azure-cosmos-dotnet-v3/tree/master/Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport

И мы все еще работаем над официальной документацией, но идея заключается в том, что когда выполняются параллельные операции, а не выполняются как отдельные запросыКак вы видите сейчас, SDK сгруппирует их в соответствии со схожестью разделов и выполнит их как групповые (пакетные) операции, сократив вызовы бэкэнд-службы и потенциально увеличивая пропускную способность между 50% -100% в зависимости от объема операций. Этот режим будет потреблять больше RU / s , так как он требует большего объема операций в секунду, чем выполнение операций по отдельности (поэтому, если вы нажмете 429s, это означает, что узкое место теперь находится в предоставленных RU / s).

var cosmosClient = new CosmosClient("todo", new CosmosClientOptions { AllowBulkExecution = true });
var database = cosmosClient.GetDatabase("<ourcontainer>");
var sdkContainerClient = database.GetContainer("<ourcontainer>");
//The more operations the better, just 25 might not yield a great difference vs non bulk
int insertCount = 10000;
//Don't do any warmup

List<Task> operations = new List<Tasks>();
var timer = Stopwatch.StartNew();
for (int i = 0; i < insertCount; i++)
{
    operations.Add(sdkContainerClient.CreateItemAsync(new TestObject()));
}

await Task.WhenAll(operations);
serialTimer.Stop();

Важно: Эта функция все еще находится в режиме предварительного просмотра. Поскольку этот режим оптимизирован для пропускной способности (а не задержки), любая отдельная отдельная операция, которую вы выполняете, не будет иметь большой операционной задержки.

Если вы хотите еще больше оптимизировать, а ваш источник данных позволяет вам получить доступПотоки (избегайте сериализации), вы можете использовать методы CreateItemStream SDK для еще большей пропускной способности.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...