Event Hub - не получается ожидаемая пропускная способность - PullRequest
0 голосов
/ 05 апреля 2019

Я установил экземпляр Event Hub с 20 единицами пропускной способности и 32 разделами на стандартном уровне. Согласно документации, каждая единица пропускной способности равна 1 МБ / с. Поэтому в идеале я должен получать пропускную способность 20 МБ / с или 1,2 ГБ / мин. Пространство имен имеет только один концентратор событий, и я единственный пользователь. Концентратор событий установлен в западной части США, и этот вариант наиболее близок к тому, откуда отправляются запросы.

Тем не менее, я вижу, что требуется не менее 10 минут для 1,77 ГБ данных. Я использую асинхронные пакетные вызовы и упаковываю каждый запрос до предела 1 МБ. Я вижу огромную разницу во времени, потребляемом вызовом SendBatchAsync - он варьируется от 0,15 до 25 секунд.

Вот мой код: (Обратите внимание: я вынужден использовать .Net Framework 4.5)

    static EventHubClient eventHubClient;        

    static Dictionary<int, List<EventData>> events = new Dictionary<int, List<EventData>>();
    static Dictionary<int, long> batchSizes = new Dictionary<int, long>();

    static long threshold = (long)(1e6 - 1000);

    static SemaphoreSlim concurrencySemaphore;
    static int maxConcurrency = 1;
    static void Main()
    {
        eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);

        Stopwatch stopWatch = new Stopwatch();
        stopWatch.Start();            

        using (concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
        {
            foreach (string record in GetRecords())
            {
                Tuple<int, EventData> currentEventDetails = GetEventData(record);
                int partitionId = currentEventDetails.Item1;
                EventData currentEvent = currentEventDetails.Item2;

                BatchOrSendAsync(partitionId, currentEvent);

            }

            SendRemainingAsync();
        }

        stopWatch.Stop();
        Console.WriteLine(string.Format("### total time taken = {0}", stopWatch.Elapsed.TotalSeconds.ToString()));

    }

    static async void BatchOrSendAsync(int partitionId, EventData currentEvent)
    {
        long batchSize = 0;

        batchSizes.TryGetValue(partitionId, out batchSize);
        long currentEventSize = currentEvent.SerializedSizeInBytes;

        if( batchSize + currentEventSize > threshold)
        {
            List<EventData> eventsToSend = events[partitionId];
            if (eventsToSend == null || eventsToSend.Count == 0)
            {
                if (currentEventSize > threshold)
                    throw new Exception("found event with size above threshold");
                return;
            }

            concurrencySemaphore.Wait();
            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();
            await eventHubClient.SendBatchAsync(eventsToSend);
            stopWatch.Stop();
            Console.WriteLine(stopWatch.Elapsed.TotalSeconds.ToString());
            concurrencySemaphore.Release();

            events[partitionId] = new List<EventData> { currentEvent };
            batchSizes[partitionId] = currentEventSize;
        }
        else
        {
            if (!events.ContainsKey(partitionId))
            { 
                events[partitionId] = new List<EventData>();
                batchSizes[partitionId] = 0;
            }
            events[partitionId].Add(currentEvent);
            batchSizes[partitionId] += currentEventSize;
        }
    }

    static async void SendRemainingAsync()
    {
        foreach(int partitionId in events.Keys)
        {                
            concurrencySemaphore.Wait();
            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();                
            await eventHubClient.SendBatchAsync(events[partitionId]);
            stopWatch.Stop();
            Console.WriteLine(stopWatch.Elapsed.TotalSeconds.ToString());
            concurrencySemaphore.Release();
        }
    }

Примечание: увеличение maxConcurrency для семафора ухудшает только общее время, и вызов SendBatchAsync начинает давать сбой, когда maxConcurrency равно 10

Что я должен сделать, чтобы улучшить пропускную способность?

...